[prev in list] [next in list] [prev in thread] [next in thread]
List: forgerock-opendj-dev
Subject: [Opendj-dev] [10840] trunk/opends: OPENDJ-1453 (CR-3870) Replica offline messages should be synced w
From: noreply () forgerock ! org
Date: 2014-06-26 13:19:57
Message-ID: 20140626131957.B1E17408FD () sources ! internal ! forgerock ! com
[Download RAW message or body]
[Attachment #2 (text/html)]
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[10840] trunk/opends: OPENDJ-1453 (CR-3870) Replica offline messages should be \
synced with updates</title> </head>
<body>
<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: \
verdana,arial,helvetica,sans-serif; font-size: 10pt; } #msg dl a { font-weight: \
bold} #msg dl a:link { color:#fc3; }
#msg dl a:active { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: \
bold; } #msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: \
6px; } #logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em \
0; } #logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg \
h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; } \
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; \
} #logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: \
-1.5em; padding-left: 1.5em; } #logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em \
1em 0 1em; background: white;} #logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid \
#fa0; border-bottom: 1px solid #fa0; background: #fff; } #logmsg table th { \
text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted \
#fa0; } #logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: \
0.2em 0.5em; } #logmsg table thead th { text-align: center; border-bottom: 1px solid \
#fa0; } #logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: \
6px; } #patch { width: 100%; }
#patch h4 {font-family: \
verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, \
#patch .copfile {border:1px solid #ccc;margin:10px 0;} #patch ins \
{background:#dfd;text-decoration:none;display:block;padding:0 10px;} #patch del \
{background:#fdd;text-decoration:none;display:block;padding:0 10px;} #patch .lines, \
.info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a \
href="http://sources.forgerock.org/changelog/opendj/?cs=10840">10840</a></dd> \
<dt>Author</dt> <dd>JnRouvignac</dd> <dt>Date</dt> <dd>2014-06-26 14:19:57 +0100 \
(Thu, 26 Jun 2014)</dd> </dl>
<h3>Log Message</h3>
<pre><a href="https://bugster.forgerock.org/jira/browse/OPENDJ-1453">OPENDJ-1453</a> \
(CR-3870) Replica offline messages should be synced with updates
Added a new ReplicaOfflineMsg to communicate that a replica is offline. \
ReplicaOfflineMsg extends UpdateMsg. It is possible that <a \
href="https://bugster.forgerock.org/jira/browse/OPENDJ-1260">OPENDJ-1260</a> will \
piggy back on this new message type. This patch makes a DS send a ReplicaOfflineMsg \
to its preferred RS on shutdown. This works when we have split DS-RS, but works only \
50% (guesstimate) of the time with combined DS-RS. There is a race condition between \
shutdown and the ReplicaOfflineMsg being forwarded by the collocated RS. Last but not \
least, RSs can communicate such ReplicaOfflineMsg to other RSs via ServerWriter / \
ServerReader.
Another last item: Due to current change, replication is querying the changelogstate \
a lot more often. This is not playing nice with ExternalChangeLogTest and \
GenerationIdTest and required changing the File-based changelog \
ReplicationEnvironment class. It might be necessary to reduce I/O to maintain an in \
memory copy of the changelogstate. I might do this in a subsequent commit.
ReplicaOfflineMsg.java: ADDED
ReplicationMsg.java:
Added support for ReplicaOfflineMsg.
ProtocolVersion.java:
Updated javadoc for REPLICATION_PROTOCOL_V8.
LDAPReplicationDomain.java, ReplicationDomain.java:
Added publishReplicaOfflineMsg().
ReplicationBroker.java:
In stop(), called ReplicationDomain.publishReplicaOfflineMsg().
ReplicationServerDomain.java:
In publishUpdateMsg(UpdateMsg), handled ReplicaOfflineMsg.
PendingChange.java:
Changed msg field + getMsg() return type from LDAPUpdateMsg to UpdateMsg.
Added getLDAPUpdateMsg().
PendingChanges.java:
Added putReplicaOfflineMsg().
In pushCommittedChanges(), changed the code as a consequence of the change to \
PendingChange.getMsg().
RemotePendingChanges.java:
Changed the code as a consequence of the change to PendingChange.getMsg().
JEChangelogDB.java:
Added ReplicaOfflineCursor inner class, decorator for a DBCursor + used it in \
getCursorFrom() to return ReplicaOfflineMsg to ServerWriter thread.
JEChangelogDBTest.java: ADDED
Tested ReplicaOfflineCursor.
ECLServerHandler.java:
In getNextMessage(), ignore ReplicaOfflineMsg which are useless to searches on \
cn=changelog.
SynchronizationMsgTest.java:
Added a test for ReplicaOfflineMsg (de)serialization.
FakeUpdateMsg.java: ADDED, extracted from CompositeDBCursorTest
CompositeDBCursorTest.java:
Replaced call to newUpdateMsg() by calling FakeUpdateMsg ctor.
ReplicationEnvironment.java:
Fixes to make ExternalChangeLogTest and GenerationIdTest work.
replication.properties:
Removed now useless messages.</pre>
<h3>Modified Paths</h3>
<ul>
<li><a href="#trunkopendssrcmessagesmessagesreplicationproperties">trunk/opends/src/messages/messages/replication.properties</a></li>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationpluginLDAPReplicationDoma \
injava">trunk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java</a></li>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationpluginPendingChangejava"> \
trunk/opends/src/server/org/opends/server/replication/plugin/PendingChange.java</a></li>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationpluginPendingChangesjava" \
>trunk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java</a></li>
>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationpluginRemotePendingChanges \
java">trunk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java</a></li>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationprotocolProtocolVersionja \
va">trunk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java</a></li>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationprotocolReplicationMsgjav \
a">trunk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java</a></li>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationserverECLServerHandlerjav \
a">trunk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java</a></li>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationserverReplicationServerDo \
mainjava">trunk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java</a></li>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationserverchangelogfileFileCh \
angelogDBjava">trunk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java</a></li>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationserverchangelogfileReplic \
ationEnvironmentjava">trunk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java</a></li>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationserverchangelogjeJEChange \
logDBjava">trunk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java</a></li>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationserviceReplicationBrokerj \
ava">trunk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java</a></li>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationserviceReplicationDomainj \
ava">trunk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java</a></li>
<li><a href="#trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationproto \
colSynchronizationMsgTestjava">trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java</a></li>
<li><a href="#trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationserve \
rchangelogjeCompositeDBCursorTestjava">trunk/opends/tests/unit-tests-testng/src/server \
/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java</a></li>
</ul>
<h3>Added Paths</h3>
<ul>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationprotocolReplicaOfflineMsgj \
ava">trunk/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java</a></li>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationserverchangelogjeReplicaO \
fflineCursorjava">trunk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java</a></li>
<li><a href="#trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationserve \
rchangelogjeFakeUpdateMsgjava">trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java</a></li>
<li><a href="#trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationserve \
rchangelogjeReplicaOfflineCursorTestjava">trunk/opends/tests/unit-tests-testng/src/ser \
ver/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java</a></li>
</ul>
</div>
<div id="patch">
<h3>Diff</h3>
<a id="trunkopendssrcmessagesmessagesreplicationproperties"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/messages/messages/replication.properties (10839 => 10840)</h4> <pre \
class="diff"><span> <span class="info">--- \
trunk/opends/src/messages/messages/replication.properties 2014-06-25 15:43:38 UTC \
(rev 10839)
+++ trunk/opends/src/messages/messages/replication.properties 2014-06-26 13:19:57 UTC \
(rev 10840) </span><span class="lines">@@ -533,9 +533,6 @@
</span><span class="cx"> change %s to replicaDB %s %s because flushing thread is \
shutting down </span><span class="cx"> \
SEVERE_ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH_241=Error when retrieving changelog \ \
</span><span class="cx"> state from root path '%s' : directory might not exist \
</span><del>-SEVERE_ERR_CHANGELOG_READ_STATE_NO_GENERATION_ID_FOUND_242=Error when \
retrieving \
- changelog state from root path '%s' : no generation id file found in domain \
- directory '%s'
</del><span class="cx"> \
SEVERE_ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY_243=Error when retrieving \
\ </span><span class="cx"> changelog state from root path '%s' : IO error on domain \
directory '%s' when retrieving \ </span><span class="cx"> list of server ids
</span><span class="lines">@@ -603,8 +600,6 @@
</span><span class="cx"> head log file from '%s' to '%s'
</span><span class="cx"> INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log \
file '%s', \ </span><span class="cx"> size of head log file is %d bytes
</span><del>-SEVERE_ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH_277=Could \
not add replica \
- offline for domain %s and server id %d because the path '%s' does not exist
</del><span class="cx"> \
SEVERE_ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE_278=Could not write \
offline \ </span><span class="cx"> replica information for domain %s and server id \
%d, using path '%s' (offline CSN is %s) </span><span class="cx"> \
SEVERE_ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE_279=Could not read replica \
offline \ </span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationpluginLDAPReplicationDomainjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -2012,6 +2012,13 @@
</span><span class="cx"> addOperation.setAttachment(SYNCHROCONTEXT, ctx);
</span><span class="cx"> }
</span><span class="cx">
</span><ins>+ /** {@inheritDoc} */
+ @Override
+ public void publishReplicaOfflineMsg()
+ {
+ pendingChanges.putReplicaOfflineMsg();
+ }
+
</ins><span class="cx"> /**
</span><span class="cx"> * Check if an operation must be synchronized.
</span><span class="cx"> * Also update the list of pending changes and the server \
RUV </span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationpluginPendingChangejava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/plugin/PendingChange.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/plugin/PendingChange.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/plugin/PendingChange.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -29,6 +29,7 @@
</span><span class="cx"> import org.opends.server.replication.common.CSN;
</span><span class="cx"> import org.opends.server.replication.common.ServerState;
</span><span class="cx"> import org.opends.server.replication.protocol.LDAPUpdateMsg;
</span><ins>+import org.opends.server.replication.protocol.UpdateMsg;
</ins><span class="cx"> import org.opends.server.types.operation.PluginOperation;
</span><span class="cx">
</span><span class="cx"> /**
</span><span class="lines">@@ -39,7 +40,7 @@
</span><span class="cx"> {
</span><span class="cx"> private final CSN csn;
</span><span class="cx"> private boolean committed;
</span><del>- private LDAPUpdateMsg msg;
</del><ins>+ private UpdateMsg msg;
</ins><span class="cx"> private final PluginOperation op;
</span><span class="cx"> private ServerState dependencyState;
</span><span class="cx">
</span><span class="lines">@@ -49,7 +50,7 @@
</span><span class="cx"> * @param op the operation to use
</span><span class="cx"> * @param msg the message to use (can be null for local \
operations) </span><span class="cx"> */
</span><del>- PendingChange(CSN csn, PluginOperation op, LDAPUpdateMsg msg)
</del><ins>+ PendingChange(CSN csn, PluginOperation op, UpdateMsg msg)
</ins><span class="cx"> {
</span><span class="cx"> this.csn = csn;
</span><span class="cx"> this.committed = false;
</span><span class="lines">@@ -89,12 +90,27 @@
</span><span class="cx"> * @return the message if operation was a replication \
operation </span><span class="cx"> * null if the operation was a local operation
</span><span class="cx"> */
</span><del>- public LDAPUpdateMsg getMsg()
</del><ins>+ public UpdateMsg getMsg()
</ins><span class="cx"> {
</span><span class="cx"> return msg;
</span><span class="cx"> }
</span><span class="cx">
</span><span class="cx"> /**
</span><ins>+ * Get the LDAPUpdateMsg associated to this PendingChange.
+ *
+ * @return the LDAPUpdateMsg if operation was a replication operation, null
+ * otherwise
+ */
+ public LDAPUpdateMsg getLDAPUpdateMsg()
+ {
+ if (msg instanceof LDAPUpdateMsg)
+ {
+ return (LDAPUpdateMsg) msg;
+ }
+ return null;
+ }
+
+ /**
</ins><span class="cx"> * Set the message associated to the PendingChange.
</span><span class="cx"> * @param msg the message
</span><span class="cx"> */
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationpluginPendingChangesjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -33,6 +33,8 @@
</span><span class="cx"> import org.opends.server.replication.common.CSN;
</span><span class="cx"> import org.opends.server.replication.common.CSNGenerator;
</span><span class="cx"> import org.opends.server.replication.protocol.LDAPUpdateMsg;
</span><ins>+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
</ins><span class="cx"> import \
org.opends.server.replication.service.ReplicationDomain; </span><span class="cx"> \
import org.opends.server.types.operation.PluginOperation; </span><span class="cx">
</span><span class="lines">@@ -136,6 +138,20 @@
</span><span class="cx"> }
</span><span class="cx">
</span><span class="cx"> /**
</span><ins>+ * Add a replica offline message to the pending list.
+ */
+ public synchronized void putReplicaOfflineMsg()
+ {
+ final CSN offlineCSN = csnGenerator.newCSN();
+ final PendingChange pendingChange =
+ new PendingChange(offlineCSN, null, new ReplicaOfflineMsg(offlineCSN));
+ pendingChange.setCommitted(true);
+
+ pendingChanges.put(offlineCSN, pendingChange);
+ pushCommittedChanges();
+ }
+
+ /**
</ins><span class="cx"> * Push all committed local changes to the \
replicationServer service. </span><span class="cx"> */
</span><span class="cx"> synchronized void pushCommittedChanges()
</span><span class="lines">@@ -152,20 +168,26 @@
</span><span class="cx"> while (firstChange != null && \
firstChange.isCommitted()) </span><span class="cx"> {
</span><span class="cx"> final PluginOperation op = firstChange.getOp();
</span><del>- if (op != null && !op.isSynchronizationOperation())
</del><ins>+ final UpdateMsg msg = firstChange.getMsg();
+ if (msg instanceof LDAPUpdateMsg
+ && op != null
+ && !op.isSynchronizationOperation())
</ins><span class="cx"> {
</span><del>- final LDAPUpdateMsg updateMsg = firstChange.getMsg();
</del><span class="cx"> if (!recoveringOldChanges)
</span><span class="cx"> {
</span><del>- domain.publish(updateMsg);
</del><ins>+ domain.publish(msg);
</ins><span class="cx"> }
</span><span class="cx"> else
</span><span class="cx"> {
</span><span class="cx"> // do not push updates until the RS catches up.
</span><span class="cx"> // @see #setRecovering(boolean)
</span><del>- domain.getServerState().update(updateMsg.getCSN());
</del><ins>+ domain.getServerState().update(msg.getCSN());
</ins><span class="cx"> }
</span><span class="cx"> }
</span><ins>+ else if (msg instanceof ReplicaOfflineMsg)
+ {
+ domain.publish(msg);
+ }
</ins><span class="cx">
</span><span class="cx"> // false warning: firstEntry will not be null if \
firstChange is not null </span><span class="cx"> \
pendingChanges.remove(firstEntry.getKey()); </span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationpluginRemotePendingChangesjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -157,7 +157,7 @@
</span><span class="cx"> if (change.dependenciesIsCovered(state))
</span><span class="cx"> {
</span><span class="cx"> dependentChanges.remove(change);
</span><del>- return change.getMsg();
</del><ins>+ return change.getLDAPUpdateMsg();
</ins><span class="cx"> }
</span><span class="cx"> }
</span><span class="cx"> return null;
</span><span class="lines">@@ -208,7 +208,7 @@
</span><span class="cx"> {
</span><span class="cx"> if (pendingChange.getCSN().isOlderThan(csn))
</span><span class="cx"> {
</span><del>- final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
</del><ins>+ final LDAPUpdateMsg pendingMsg = \
pendingChange.getLDAPUpdateMsg(); </ins><span class="cx"> if (pendingMsg != \
null) </span><span class="cx"> {
</span><span class="cx"> if (pendingMsg instanceof DeleteMsg)
</span><span class="lines">@@ -300,7 +300,7 @@
</span><span class="cx"> {
</span><span class="cx"> if (pendingChange.getCSN().isOlderThan(csn))
</span><span class="cx"> {
</span><del>- final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
</del><ins>+ final LDAPUpdateMsg pendingMsg = \
pendingChange.getLDAPUpdateMsg(); </ins><span class="cx"> if (pendingMsg \
instanceof AddMsg) </span><span class="cx"> {
</span><span class="cx"> // Check if the operation to be run is an \
addOperation on a same DN. </span><span class="lines">@@ -350,13 +350,13 @@
</span><span class="cx"> return false;
</span><span class="cx"> }
</span><span class="cx">
</span><del>- final DN targetDN = change.getMsg().getDN();
</del><ins>+ final DN targetDN = change.getLDAPUpdateMsg().getDN();
</ins><span class="cx">
</span><span class="cx"> for (PendingChange pendingChange : \
pendingChanges.values()) </span><span class="cx"> {
</span><span class="cx"> if (pendingChange.getCSN().isOlderThan(csn))
</span><span class="cx"> {
</span><del>- final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
</del><ins>+ final LDAPUpdateMsg pendingMsg = \
pendingChange.getLDAPUpdateMsg(); </ins><span class="cx"> if (pendingMsg != \
null) </span><span class="cx"> {
</span><span class="cx"> if (pendingMsg instanceof DeleteMsg)
</span><span class="lines">@@ -442,7 +442,7 @@
</span><span class="cx"> {
</span><span class="cx"> if (pendingChange.getCSN().isOlderThan(csn))
</span><span class="cx"> {
</span><del>- final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
</del><ins>+ final LDAPUpdateMsg pendingMsg = \
pendingChange.getLDAPUpdateMsg(); </ins><span class="cx"> if (pendingMsg != \
null) </span><span class="cx"> {
</span><span class="cx"> if (pendingMsg instanceof DeleteMsg)
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationprotocolProtocolVersionjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -99,7 +99,7 @@
</span><span class="cx"> /**
</span><span class="cx"> * The constant for the 8th version of the replication \
protocol. </span><span class="cx"> * <ul>
</span><del>- * <li>StopMsg now has a timestamp to communicate the replica \
stop time.</li> </del><ins>+ * <li>New ReplicaOfflineMsg.</li>
</ins><span class="cx"> * </ul>
</span><span class="cx"> */
</span><span class="cx"> public static final short REPLICATION_PROTOCOL_V8 = 8;
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationprotocolReplicaOfflineMsgjava"></a>
<div class="addfile"><h4>Added: \
trunk/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java \
(0 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java \
(rev 0)
+++ trunk/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -0,0 +1,113 @@
</span><ins>+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.protocol;
+
+import java.util.zip.DataFormatException;
+
+import org.opends.server.replication.common.CSN;
+
+import static org.opends.server.replication.protocol.ByteArrayBuilder.*;
+
+/**
+ * Class that define messages sent by a replica (DS) to the replication server
+ * (RS) to let the RS know the date at which a replica went offline.
+ */
+public class ReplicaOfflineMsg extends UpdateMsg
+{
+
+ /**
+ * Constructor of a replica offline message providing the offline timestamp in
+ * a CSN.
+ *
+ * @param offlineCSN
+ * the provided offline CSN
+ */
+ public ReplicaOfflineMsg(final CSN offlineCSN)
+ {
+ super(offlineCSN, new byte[0]);
+ }
+
+ /**
+ * Creates a message by deserializing it from the provided byte array.
+ *
+ * @param in
+ * The provided byte array.
+ * @throws DataFormatException
+ * When an error occurs during decoding .
+ */
+ public ReplicaOfflineMsg(byte[] in) throws DataFormatException
+ {
+ try
+ {
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_REPLICA_OFFLINE)
+ {
+ throw new DataFormatException("input is not a valid "
+ + getClass().getSimpleName() + " message: " + msgType);
+ }
+ protocolVersion = scanner.nextShort();
+ csn = scanner.nextCSN();
+
+ if (!scanner.isEmpty())
+ {
+ throw new DataFormatException(
+ "Did not expect to find more bytes to read for "
+ + getClass().getSimpleName());
+ }
+ }
+ catch (RuntimeException e)
+ {
+ // Index out of bounds, bad format, etc.
+ throw new DataFormatException("byte[] is not a valid "
+ + getClass().getSimpleName());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte[] getBytes(short protocolVersion)
+ {
+ final ByteArrayBuilder builder = new ByteArrayBuilder(size());
+ builder.appendByte(MSG_TYPE_REPLICA_OFFLINE);
+ builder.appendShort(protocolVersion);
+ builder.appendCSN(csn);
+ return builder.toByteArray();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int size()
+ {
+ return bytes(1) + shorts(1) + csns(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + " offlineCSN=" + csn;
+ }
+}
</ins><span class="cx">Property changes on: \
trunk/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java \
</span><span class="cx">___________________________________________________________________
</span></span></pre></div>
<a id="svneolstyle"></a>
<div class="addfile"><h4>Added: svn:eol-style</h4></div>
<a id="trunkopendssrcserverorgopendsserverreplicationprotocolReplicationMsgjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -85,6 +85,9 @@
</span><span class="cx"> // EntryMsg, InitializeRequestMsg, InitializeTargetMsg, \
ErrorMsg </span><span class="cx"> // TopologyMsg
</span><span class="cx">
</span><ins>+ /** @since {@link ProtocolVersion#REPLICATION_PROTOCOL_V8} */
+ static final byte MSG_TYPE_REPLICA_OFFLINE = 37;
+
</ins><span class="cx"> // Adding a new type of message here probably requires to
</span><span class="cx"> // change accordingly generateMsg method below
</span><span class="cx">
</span><span class="lines">@@ -199,6 +202,8 @@
</span><span class="cx"> return new StopMsg(buffer);
</span><span class="cx"> case MSG_TYPE_INITIALIZE_RCV_ACK:
</span><span class="cx"> return new InitializeRcvAckMsg(buffer);
</span><ins>+ case MSG_TYPE_REPLICA_OFFLINE:
+ return new ReplicaOfflineMsg(buffer);
</ins><span class="cx"> default:
</span><span class="cx"> throw new DataFormatException("received message \
with unknown type"); </span><span class="cx"> }
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserverECLServerHandlerjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -223,7 +223,13 @@
</span><span class="cx"> {
</span><span class="cx"> final UpdateMsg newMsg = mh.getNextMessage(false /* \
non blocking */); </span><span class="cx">
</span><del>- if (newMsg == null)
</del><ins>+ if (newMsg instanceof ReplicaOfflineMsg)
+ {
+ // and ReplicaOfflineMsg cannot be returned to a search on cn=changelog
+ // proceed as if it was never returned
+ continue;
+ }
+ else if (newMsg == null)
</ins><span class="cx"> { // in non blocking mode, null means no more \
messages </span><span class="cx"> return null;
</span><span class="cx"> }
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserverReplicationServerDomainjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -496,6 +496,13 @@
</span><span class="cx"> {
</span><span class="cx"> try
</span><span class="cx"> {
</span><ins>+ if (updateMsg instanceof ReplicaOfflineMsg)
+ {
+ final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg;
+ this.domainDB.notifyReplicaOffline(baseDN, offlineMsg.getCSN());
+ return true;
+ }
+
</ins><span class="cx"> if (this.domainDB.publishUpdateMsg(baseDN, updateMsg))
</span><span class="cx"> {
</span><span class="cx"> /*
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserverchangelogfileFileChangelogDBjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -45,6 +45,7 @@
</span><span class="cx"> import org.opends.server.replication.server.changelog.api.*;
</span><span class="cx"> import \
org.opends.server.replication.server.changelog.je.ChangeNumberIndexer; </span><span \
class="cx"> import org.opends.server.replication.server.changelog.je.CompositeDBCursor;
</span><ins>+import \
org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor; </ins><span \
class="cx"> import org.opends.server.types.DN; </span><span class="cx"> import \
org.opends.server.types.DebugLogLevel; </span><span class="cx"> import \
org.opends.server.util.StaticUtils; </span><span class="lines">@@ -388,7 +389,7 @@
</span><span class="cx"> {
</span><span class="cx"> if (debugEnabled())
</span><span class="cx"> {
</span><del>- TRACER.debugError("clear the FileChangelogDB");
</del><ins>+ TRACER.debugInfo("clear the FileChangelogDB");
</ins><span class="cx"> }
</span><span class="cx"> if (!dbDirectory.exists())
</span><span class="cx"> {
</span><span class="lines">@@ -631,18 +632,39 @@
</span><span class="cx"> throws ChangelogException
</span><span class="cx"> {
</span><span class="cx"> final Set<Integer> serverIds = \
getDomainMap(baseDN).keySet(); </span><ins>+ final ChangelogState state = \
replicationEnv.readChangelogState(); </ins><span class="cx"> final \
Map<DBCursor<UpdateMsg>, Void> cursors = new \
HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); </span><span \
class="cx"> for (int serverId : serverIds) </span><span class="cx"> {
</span><span class="cx"> // get the last already sent CSN from that server to \
get a cursor </span><span class="cx"> final CSN lastCSN = startAfterServerState \
!= null ? startAfterServerState.getCSN(serverId) : null; </span><del>- \
cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); </del><ins>+ final \
DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); \
+ final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, \
startAfterServerState); + cursors.put(new ReplicaOfflineCursor(replicaDBCursor, \
offlineCSN), null); </ins><span class="cx"> }
</span><span class="cx"> // recycle exhausted cursors,
</span><span class="cx"> // because client code will not manage the cursors \
itself </span><span class="cx"> return new CompositeDBCursor<Void>(cursors, \
true); </span><span class="cx"> }
</span><span class="cx">
</span><ins>+ private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int \
serverId, + ServerState startAfterServerState)
+ {
+ final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
+ if (domain != null)
+ {
+ for (CSN offlineCSN : domain)
+ {
+ if (serverId == offlineCSN.getServerId()
+ && !startAfterServerState.cover(offlineCSN))
+ {
+ return offlineCSN;
+ }
+ }
+ }
+ return null;
+ }
+
</ins><span class="cx"> /** {@inheritDoc} */
</span><span class="cx"> @Override
</span><span class="cx"> public DBCursor<UpdateMsg> getCursorFrom(final DN \
baseDN, final int serverId, final CSN startAfterCSN) </span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserverchangelogfileReplicationEnvironmentjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -334,9 +334,13 @@
</span><span class="cx"> */
</span><span class="cx"> void clearGenerationId(final DN domainDN) throws \
ChangelogException </span><span class="cx"> {
</span><del>- synchronized(domainLock)
</del><ins>+ synchronized (domainLock)
</ins><span class="cx"> {
</span><span class="cx"> final String domainId = domains.get(domainDN);
</span><ins>+ if (domainId == null)
+ {
+ return; // unknow domain => no-op
+ }
</ins><span class="cx"> final File idFile = \
retrieveGenerationIdFile(getDomainPath(domainId)); </span><span class="cx"> if \
(idFile != null) </span><span class="cx"> {
</span><span class="lines">@@ -365,6 +369,10 @@
</span><span class="cx"> {
</span><span class="cx"> clearGenerationId(baseDN);
</span><span class="cx"> final String domainId = domains.get(baseDN);
</span><ins>+ if (domainId == null)
+ {
+ return; // unknow domain => no-op
+ }
</ins><span class="cx"> final File generationIdPath = \
getGenerationIdPath(domainId, NO_GENERATION_ID); </span><span class="cx"> \
ensureGenerationIdFileExists(generationIdPath); </span><span class="cx"> }
</span><span class="lines">@@ -386,11 +394,14 @@
</span><span class="cx"> synchronized (domainLock)
</span><span class="cx"> {
</span><span class="cx"> final String domainId = domains.get(domainDN);
</span><ins>+ if (domainId == null)
+ {
+ return; // unknow domain => no-op
+ }
</ins><span class="cx"> final File serverIdPath = getServerIdPath(domainId, \
offlineCSN.getServerId()); </span><span class="cx"> if (!serverIdPath.exists())
</span><span class="cx"> {
</span><del>- throw new \
ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH.get(
- domainDN.toString(), offlineCSN.getServerId(), serverIdPath.getPath()));
</del><ins>+ return; // no serverId anymore => no-op
</ins><span class="cx"> }
</span><span class="cx"> final File offlineFile = new File(serverIdPath, \
REPLICA_OFFLINE_STATE_FILENAME); </span><span class="cx"> Writer writer = null;
</span><span class="lines">@@ -428,6 +439,10 @@
</span><span class="cx"> synchronized (domainLock)
</span><span class="cx"> {
</span><span class="cx"> final String domainId = domains.get(domainDN);
</span><ins>+ if (domainId == null)
+ {
+ return; // unknow domain => no-op
+ }
</ins><span class="cx"> final File offlineFile = new \
File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME); \
</span><span class="cx"> if (offlineFile.exists()) </span><span class="cx"> \
{ </span><span class="lines">@@ -512,14 +527,12 @@
</span><span class="cx"> throws ChangelogException
</span><span class="cx"> {
</span><span class="cx"> final File domainDirectory = \
getDomainPath(domainEntry.getValue()); </span><ins>+ final DN domainDN = \
domainEntry.getKey(); </ins><span class="cx"> final String generationId = \
retrieveGenerationId(domainDirectory); </span><del>- if (generationId == null)
</del><ins>+ if (generationId != null)
</ins><span class="cx"> {
</span><del>- throw new \
ChangelogException(ERR_CHANGELOG_READ_STATE_NO_GENERATION_ID_FOUND.get(
- replicationRootPath, domainDirectory.getPath()));
</del><ins>+ state.setDomainGenerationId(domainDN, \
toGenerationId(generationId)); </ins><span class="cx"> }
</span><del>- final DN domainDN = domainEntry.getKey();
- state.setDomainGenerationId(domainDN, toGenerationId(generationId));
</del><span class="cx">
</span><span class="cx"> final File[] serverIds = \
domainDirectory.listFiles(SERVER_ID_FILE_FILTER); </span><span class="cx"> if \
(serverIds == null) </span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserverchangelogjeJEChangelogDBjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -707,18 +707,39 @@
</span><span class="cx"> throws ChangelogException
</span><span class="cx"> {
</span><span class="cx"> final Set<Integer> serverIds = \
getDomainMap(baseDN).keySet(); </span><ins>+ final ChangelogState state = \
dbEnv.readChangelogState(); </ins><span class="cx"> final \
Map<DBCursor<UpdateMsg>, Void> cursors = new \
HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); </span><span \
class="cx"> for (int serverId : serverIds) </span><span class="cx"> {
</span><span class="cx"> // get the last already sent CSN from that server to \
get a cursor </span><span class="cx"> final CSN lastCSN = startAfterServerState \
!= null ? startAfterServerState.getCSN(serverId) : null; </span><del>- \
cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); </del><ins>+ final \
DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); \
+ final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, \
startAfterServerState); + cursors.put(new ReplicaOfflineCursor(replicaDBCursor, \
offlineCSN), null); </ins><span class="cx"> }
</span><span class="cx"> // recycle exhausted cursors,
</span><span class="cx"> // because client code will not manage the cursors \
itself </span><span class="cx"> return new CompositeDBCursor<Void>(cursors, \
true); </span><span class="cx"> }
</span><span class="cx">
</span><ins>+ private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int \
serverId, + ServerState startAfterServerState)
+ {
+ final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
+ if (domain != null)
+ {
+ for (CSN offlineCSN : domain)
+ {
+ if (serverId == offlineCSN.getServerId()
+ && !startAfterServerState.cover(offlineCSN))
+ {
+ return offlineCSN;
+ }
+ }
+ }
+ return null;
+ }
+
</ins><span class="cx"> /** {@inheritDoc} */
</span><span class="cx"> @Override
</span><span class="cx"> public DBCursor<UpdateMsg> getCursorFrom(final DN \
baseDN, final int serverId, final CSN startAfterCSN) </span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserverchangelogjeReplicaOfflineCursorjava"></a>
<div class="addfile"><h4>Added: \
trunk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java \
(0 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java \
(rev 0)
+++ trunk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -0,0 +1,126 @@
</span><ins>+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+
+/**
+ * Implementation of a DBCursor that decorates an existing DBCursor
+ * and returns a ReplicaOfflineMsg when the decorated DBCursor is exhausted
+ * and the offline CSN is newer than the last returned update CSN.
+ */
+public class ReplicaOfflineCursor implements DBCursor<UpdateMsg>
+{
+ /** @NonNull */
+ private final DBCursor<UpdateMsg> cursor;
+ private ReplicaOfflineMsg replicaOfflineMsg;
+ /**
+ * Whether calls to {@link #getRecord()} must return the {@link ReplicaOfflineMsg}
+ */
+ private boolean returnReplicaOfflineMsg;
+
+ /**
+ * Creates a ReplicaOfflineCursor object with a cursor to decorate
+ * and an offlineCSN to return as part of a ReplicaOfflineMsg.
+ *
+ * @param cursor
+ * the non-null underlying cursor that needs to be exhausted before
+ * we return a ReplicaOfflineMsg
+ * @param offlineCSN
+ * The offline CSN from which to builder the
+ * {@link ReplicaOfflineMsg} to return
+ */
+ public ReplicaOfflineCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN)
+ {
+ this.replicaOfflineMsg =
+ offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null;
+ this.cursor = cursor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UpdateMsg getRecord()
+ {
+ return returnReplicaOfflineMsg ? replicaOfflineMsg : cursor.getRecord();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ if (returnReplicaOfflineMsg)
+ {
+ // already consumed, never return it again...
+ replicaOfflineMsg = null;
+ returnReplicaOfflineMsg = false;
+ // ...and verify if new changes have been added to the DB
+ // (cursors are automatically restarted)
+ }
+ final UpdateMsg lastUpdate = cursor.getRecord();
+ final boolean hasNext = cursor.next();
+ if (hasNext)
+ {
+ return true;
+ }
+ if (replicaOfflineMsg == null)
+ { // no ReplicaOfflineMsg to return
+ return false;
+ }
+
+ // replicaDB just happened to be exhausted now
+ if (lastUpdate != null
+ && replicaOfflineMsg.getCSN().isOlderThanOrEqualTo(lastUpdate.getCSN()))
+ {
+ // offlineCSN is outdated, never return it
+ replicaOfflineMsg = null;
+ return false;
+ }
+ returnReplicaOfflineMsg = true;
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ cursor.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName()
+ + " returnReplicaOfflineMsg=" + returnReplicaOfflineMsg
+ + " offlineCSN="
+ + (replicaOfflineMsg != null ? replicaOfflineMsg.getCSN().toStringUI() : \
null) + + " cursor=" + cursor.toString().split("", 2)[1];
+ }
+
+}
</ins><span class="cx">\ No newline at end of file
</span><span class="cx">Property changes on: \
trunk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
</span><span class="cx">___________________________________________________________________
</span></span></pre></div>
<a id="svneolstyle"></a>
<div class="addfile"><h4>Added: svn:eol-style</h4></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserviceReplicationBrokerjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -2802,6 +2802,7 @@
</span><span class="cx">
</span><span class="cx"> synchronized (startStopLock)
</span><span class="cx"> {
</span><ins>+ domain.publishReplicaOfflineMsg();
</ins><span class="cx"> shutdown = true;
</span><span class="cx"> setConnectedRS(ConnectedRS.stopped());
</span><span class="cx"> stopRSHeartBeatMonitoring();
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserviceReplicationDomainjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -3450,6 +3450,15 @@
</span><span class="cx"> }
</span><span class="cx">
</span><span class="cx"> /**
</span><ins>+ * Publishes a replica offline message if all pending changes for \
current + * replica have been sent out.
+ */
+ public void publishReplicaOfflineMsg()
+ {
+ // Here to be overridden
+ }
+
+ /**
</ins><span class="cx"> * Publish information to the Replication Service (not \
assured mode). </span><span class="cx"> *
</span><span class="cx"> * @param msg The byte array containing the information \
that should </span></span></pre></div>
<a id="trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationprotocolSynchronizationMsgTestjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -843,6 +843,17 @@
</span><span class="cx"> assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN());
</span><span class="cx"> }
</span><span class="cx">
</span><ins>+ @Test
+ public void replicaOfflineMsgTest() throws Exception
+ {
+ final CSN csn = new CSN(System.currentTimeMillis(), 0, 42);
+ final ReplicaOfflineMsg expectedMsg = new ReplicaOfflineMsg(csn);
+
+ final byte[] bytes = expectedMsg.getBytes(REPLICATION_PROTOCOL_V8);
+ ReplicaOfflineMsg decodedMsg = new ReplicaOfflineMsg(bytes);
+ assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN());
+ }
+
</ins><span class="cx"> /**
</span><span class="cx"> * Test that WindowMsg encoding and decoding works
</span><span class="cx"> * by checking that : msg == new \
WindowMsg(msg.getBytes()). </span></span></pre></div>
<a id="trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationserverchangelogjeCompositeDBCursorTestjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java 2014-06-25 \
15:43:38 UTC (rev 10839)
+++ trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -29,7 +29,6 @@
</span><span class="cx"> import java.util.Map;
</span><span class="cx">
</span><span class="cx"> import org.opends.server.DirectoryServerTestCase;
</span><del>-import org.opends.server.replication.common.CSN;
</del><span class="cx"> import org.opends.server.replication.protocol.UpdateMsg;
</span><span class="cx"> import \
org.opends.server.replication.server.changelog.api.ChangelogException; </span><span \
class="cx"> import org.opends.server.replication.server.changelog.api.DBCursor; \
</span><span class="lines">@@ -56,10 +55,10 @@ </span><span class="cx"> \
@BeforeClass </span><span class="cx"> public void setupMsgs()
</span><span class="cx"> {
</span><del>- msg1 = newUpdateMsg(1);
- msg2 = newUpdateMsg(2);
- msg3 = newUpdateMsg(3);
- msg4 = newUpdateMsg(4);
</del><ins>+ msg1 = new FakeUpdateMsg(1);
+ msg2 = new FakeUpdateMsg(2);
+ msg3 = new FakeUpdateMsg(3);
+ msg4 = new FakeUpdateMsg(4);
</ins><span class="cx"> }
</span><span class="cx">
</span><span class="cx"> @Test
</span><span class="lines">@@ -134,19 +133,6 @@
</span><span class="cx"> of(msg4, baseDN1));
</span><span class="cx"> }
</span><span class="cx">
</span><del>- private UpdateMsg newUpdateMsg(final int t)
- {
- return new UpdateMsg(new CSN(t, t, t), new byte[t])
- {
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return "UpdateMsg(" + t + ")";
- }
- };
- }
-
</del><span class="cx"> private CompositeDBCursor<String> \
newCompositeDBCursor( </span><span class="cx"> Pair<? extends \
DBCursor<UpdateMsg>, String>... pairs) throws Exception </span><span \
class="cx"> { </span></span></pre></div>
<a id="trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationserverchangelogjeFakeUpdateMsgjava"></a>
<div class="addfile"><h4>Added: \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java \
(0 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java \
(rev 0)
+++ trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -0,0 +1,47 @@
</span><ins>+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+
+@SuppressWarnings("javadoc")
+class FakeUpdateMsg extends UpdateMsg
+{
+ private final int t;
+
+ FakeUpdateMsg(int t)
+ {
+ super(new CSN(t, t, t), new byte[1]);
+ this.t = t;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return "UpdateMsg(" + t + ")";
+ }
+}
</ins><span class="cx">\ No newline at end of file
</span><span class="cx">Property changes on: \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java
</span><span class="cx">___________________________________________________________________
</span></span></pre></div>
<a id="svneolstyle"></a>
<div class="addfile"><h4>Added: svn:eol-style</h4></div>
<a id="trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationserverchangelogjeReplicaOfflineCursorTestjava"></a>
<div class="addfile"><h4>Added: \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java \
(0 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java \
(rev 0)
+++ trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java 2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -0,0 +1,129 @@
</span><ins>+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+
+/**
+ * Test the ReplicaOfflineCursor class.
+ */
+@SuppressWarnings("javadoc")
+public class ReplicaOfflineCursorTest extends ReplicationTestCase
+{
+
+ private int timestamp;
+ private DBCursor<UpdateMsg> delegateCursor;
+
+ @BeforeTest
+ public void init()
+ {
+ timestamp = 1;
+ }
+
+ @Test
+ public void cursorReturnsFalse() throws Exception
+ {
+ delegateCursor = new SequentialDBCursor();
+
+ final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, \
null); + assertThat(cursor.getRecord()).isNull();
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+ @Test
+ public void cursorReturnsTrue() throws Exception
+ {
+ final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
+ delegateCursor = new SequentialDBCursor(null, updateMsg);
+
+ final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, \
null); + assertThat(cursor.getRecord()).isNull();
+ assertThat(cursor.next()).isTrue();
+ assertThat(cursor.getRecord()).isSameAs(updateMsg);
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+ @Test
+ public void cursorReturnsReplicaOfflineMsg() throws Exception
+ {
+ delegateCursor = new SequentialDBCursor();
+
+ final CSN offlineCSN = new CSN(timestamp++, 1, 1);
+ final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, \
offlineCSN); + assertThat(cursor.getRecord()).isNull();
+ assertThat(cursor.next()).isTrue();
+ final UpdateMsg record = cursor.getRecord();
+ assertThat(record).isInstanceOf(ReplicaOfflineMsg.class);
+ assertThat(record.getCSN()).isEqualTo(offlineCSN);
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+ @Test
+ public void cursorReturnsUpdateMsgThenReplicaOfflineMsg() throws Exception
+ {
+ final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
+ delegateCursor = new SequentialDBCursor(null, updateMsg);
+
+ final CSN offlineCSN = new CSN(timestamp++, 1, 1);
+ final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, \
offlineCSN); + assertThat(cursor.getRecord()).isNull();
+ assertThat(cursor.next()).isTrue();
+ assertThat(cursor.getRecord()).isSameAs(updateMsg);
+ assertThat(cursor.next()).isTrue();
+ final UpdateMsg record = cursor.getRecord();
+ assertThat(record).isInstanceOf(ReplicaOfflineMsg.class);
+ assertThat(record.getCSN()).isEqualTo(offlineCSN);
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+ @Test
+ public void cursorReturnsUpdateMsgThenNeverReturnsOutdatedReplicaOfflineMsg() \
throws Exception + {
+ final CSN outdatedOfflineCSN = new CSN(timestamp++, 1, 1);
+
+ final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
+ delegateCursor = new SequentialDBCursor(null, updateMsg);
+
+ final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, \
outdatedOfflineCSN); + assertThat(cursor.getRecord()).isNull();
+ assertThat(cursor.next()).isTrue();
+ assertThat(cursor.getRecord()).isSameAs(updateMsg);
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+}
</ins><span class="cx">Property changes on: \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
</span><span class="cx">___________________________________________________________________
</span></span></pre></div>
<a id="svneolstyle"></a>
<div class="addfile"><h4>Added: svn:eol-style</h4></div>
</div>
<div id="footer">Copyright (c) by ForgeRock. All rights reserved.</div>
</body>
</html>
_______________________________________________
OpenDJ-dev mailing list
OpenDJ-dev@forgerock.org
https://lists.forgerock.org/mailman/listinfo/opendj-dev
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic