[prev in list] [next in list] [prev in thread] [next in thread] 

List:       forgerock-opendj-dev
Subject:    [Opendj-dev] [10840] trunk/opends: OPENDJ-1453 (CR-3870) Replica offline messages should be synced w
From:       noreply () forgerock ! org
Date:       2014-06-26 13:19:57
Message-ID: 20140626131957.B1E17408FD () sources ! internal ! forgerock ! com
[Download RAW message or body]

[Attachment #2 (text/html)]

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[10840] trunk/opends: OPENDJ-1453 (CR-3870) Replica offline messages should be \
synced with updates</title> </head>
<body>

<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: \
verdana,arial,helvetica,sans-serif; font-size: 10pt;  } #msg dl a { font-weight: \
bold} #msg dl a:link    { color:#fc3; }
#msg dl a:active  { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: \
bold; } #msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: \
6px; } #logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em \
0; } #logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg \
h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; } \
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; \
} #logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: \
-1.5em; padding-left: 1.5em; } #logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em \
1em 0 1em; background: white;} #logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid \
#fa0; border-bottom: 1px solid #fa0; background: #fff; } #logmsg table th { \
text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted \
#fa0; } #logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: \
0.2em 0.5em; } #logmsg table thead th { text-align: center; border-bottom: 1px solid \
#fa0; } #logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: \
6px; } #patch { width: 100%; }
#patch h4 {font-family: \
verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
 #patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff  {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, \
#patch .copfile {border:1px solid #ccc;margin:10px 0;} #patch ins \
{background:#dfd;text-decoration:none;display:block;padding:0 10px;} #patch del \
{background:#fdd;text-decoration:none;display:block;padding:0 10px;} #patch .lines, \
                .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a \
href="http://sources.forgerock.org/changelog/opendj/?cs=10840">10840</a></dd> \
<dt>Author</dt> <dd>JnRouvignac</dd> <dt>Date</dt> <dd>2014-06-26 14:19:57 +0100 \
(Thu, 26 Jun 2014)</dd> </dl>

<h3>Log Message</h3>
<pre><a href="https://bugster.forgerock.org/jira/browse/OPENDJ-1453">OPENDJ-1453</a> \
(CR-3870) Replica offline messages should be synced with updates

Added a new ReplicaOfflineMsg to communicate that a replica is offline. \
ReplicaOfflineMsg extends UpdateMsg. It is possible that <a \
href="https://bugster.forgerock.org/jira/browse/OPENDJ-1260">OPENDJ-1260</a> will \
piggy back on this new message type. This patch makes a DS send a ReplicaOfflineMsg \
to its preferred RS on shutdown. This works when we have split DS-RS, but works only \
50% (guesstimate) of the time with combined DS-RS. There is a race condition between \
shutdown and the ReplicaOfflineMsg being forwarded by the collocated RS. Last but not \
least, RSs can communicate such ReplicaOfflineMsg to other RSs via ServerWriter / \
ServerReader.

Another last item: Due to current change, replication is querying the changelogstate \
a lot more often. This is not playing nice with ExternalChangeLogTest and \
GenerationIdTest and required changing the File-based changelog \
ReplicationEnvironment class. It might be necessary to reduce I/O to maintain an in \
memory copy of the changelogstate. I might do this in a subsequent commit.




ReplicaOfflineMsg.java: ADDED

ReplicationMsg.java:
Added support for ReplicaOfflineMsg.

ProtocolVersion.java:
Updated javadoc for REPLICATION_PROTOCOL_V8.

LDAPReplicationDomain.java, ReplicationDomain.java:
Added publishReplicaOfflineMsg().

ReplicationBroker.java:
In stop(), called ReplicationDomain.publishReplicaOfflineMsg().

ReplicationServerDomain.java:
In publishUpdateMsg(UpdateMsg), handled ReplicaOfflineMsg.



PendingChange.java:
Changed msg field + getMsg() return type from LDAPUpdateMsg to UpdateMsg.
Added getLDAPUpdateMsg().

PendingChanges.java:
Added putReplicaOfflineMsg().
In pushCommittedChanges(), changed the code as a consequence of the change to \
PendingChange.getMsg().

RemotePendingChanges.java:
Changed the code as a consequence of the change to PendingChange.getMsg().



JEChangelogDB.java:
Added ReplicaOfflineCursor inner class, decorator for a DBCursor + used it in \
getCursorFrom() to return ReplicaOfflineMsg to ServerWriter thread.

JEChangelogDBTest.java: ADDED
Tested ReplicaOfflineCursor.

ECLServerHandler.java:
In getNextMessage(), ignore ReplicaOfflineMsg which are useless to searches on \
cn=changelog.



SynchronizationMsgTest.java:
Added a test for ReplicaOfflineMsg (de)serialization.

FakeUpdateMsg.java: ADDED, extracted from CompositeDBCursorTest

CompositeDBCursorTest.java:
Replaced call to newUpdateMsg() by calling FakeUpdateMsg ctor.



ReplicationEnvironment.java:
Fixes to make ExternalChangeLogTest and GenerationIdTest work.

replication.properties:
Removed now useless messages.</pre>

<h3>Modified Paths</h3>
<ul>
<li><a href="#trunkopendssrcmessagesmessagesreplicationproperties">trunk/opends/src/messages/messages/replication.properties</a></li>
 <li><a href="#trunkopendssrcserverorgopendsserverreplicationpluginLDAPReplicationDoma \
injava">trunk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java</a></li>
 <li><a href="#trunkopendssrcserverorgopendsserverreplicationpluginPendingChangejava"> \
trunk/opends/src/server/org/opends/server/replication/plugin/PendingChange.java</a></li>
 <li><a href="#trunkopendssrcserverorgopendsserverreplicationpluginPendingChangesjava" \
>trunk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java</a></li>
> 
<li><a href="#trunkopendssrcserverorgopendsserverreplicationpluginRemotePendingChanges \
java">trunk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java</a></li>
 <li><a href="#trunkopendssrcserverorgopendsserverreplicationprotocolProtocolVersionja \
va">trunk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java</a></li>
 <li><a href="#trunkopendssrcserverorgopendsserverreplicationprotocolReplicationMsgjav \
a">trunk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java</a></li>
 <li><a href="#trunkopendssrcserverorgopendsserverreplicationserverECLServerHandlerjav \
a">trunk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java</a></li>
 <li><a href="#trunkopendssrcserverorgopendsserverreplicationserverReplicationServerDo \
mainjava">trunk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java</a></li>
 <li><a href="#trunkopendssrcserverorgopendsserverreplicationserverchangelogfileFileCh \
angelogDBjava">trunk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java</a></li>
 <li><a href="#trunkopendssrcserverorgopendsserverreplicationserverchangelogfileReplic \
ationEnvironmentjava">trunk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java</a></li>
 <li><a href="#trunkopendssrcserverorgopendsserverreplicationserverchangelogjeJEChange \
logDBjava">trunk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java</a></li>
 <li><a href="#trunkopendssrcserverorgopendsserverreplicationserviceReplicationBrokerj \
ava">trunk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java</a></li>
 <li><a href="#trunkopendssrcserverorgopendsserverreplicationserviceReplicationDomainj \
ava">trunk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java</a></li>
 <li><a href="#trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationproto \
colSynchronizationMsgTestjava">trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java</a></li>
 <li><a href="#trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationserve \
rchangelogjeCompositeDBCursorTestjava">trunk/opends/tests/unit-tests-testng/src/server \
/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java</a></li>
 </ul>

<h3>Added Paths</h3>
<ul>
<li><a href="#trunkopendssrcserverorgopendsserverreplicationprotocolReplicaOfflineMsgj \
ava">trunk/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java</a></li>
 <li><a href="#trunkopendssrcserverorgopendsserverreplicationserverchangelogjeReplicaO \
fflineCursorjava">trunk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java</a></li>
 <li><a href="#trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationserve \
rchangelogjeFakeUpdateMsgjava">trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java</a></li>
 <li><a href="#trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationserve \
rchangelogjeReplicaOfflineCursorTestjava">trunk/opends/tests/unit-tests-testng/src/ser \
ver/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java</a></li>
 </ul>

</div>
<div id="patch">
<h3>Diff</h3>
<a id="trunkopendssrcmessagesmessagesreplicationproperties"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/messages/messages/replication.properties (10839 => 10840)</h4> <pre \
class="diff"><span> <span class="info">--- \
trunk/opends/src/messages/messages/replication.properties	2014-06-25 15:43:38 UTC \
                (rev 10839)
+++ trunk/opends/src/messages/messages/replication.properties	2014-06-26 13:19:57 UTC \
(rev 10840) </span><span class="lines">@@ -533,9 +533,6 @@
</span><span class="cx">  change %s to replicaDB %s %s because flushing thread is \
shutting down </span><span class="cx"> \
SEVERE_ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH_241=Error when retrieving changelog \ \
</span><span class="cx">  state from root path '%s' : directory might not exist \
</span><del>-SEVERE_ERR_CHANGELOG_READ_STATE_NO_GENERATION_ID_FOUND_242=Error when \
                retrieving \
- changelog state from root path '%s' : no generation id file found in domain \
- directory '%s'
</del><span class="cx"> \
SEVERE_ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY_243=Error when retrieving \
\ </span><span class="cx">  changelog state from root path '%s' : IO error on domain \
directory '%s' when retrieving \ </span><span class="cx">  list of server ids
</span><span class="lines">@@ -603,8 +600,6 @@
</span><span class="cx">  head log file from '%s' to '%s'
</span><span class="cx"> INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log \
file '%s', \ </span><span class="cx">  size of head log file is %d bytes
</span><del>-SEVERE_ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH_277=Could \
                not add replica \
- offline for domain %s and server id %d because the path '%s' does not exist
</del><span class="cx"> \
SEVERE_ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE_278=Could not write \
offline \ </span><span class="cx">  replica information for domain %s and server id \
%d, using path '%s' (offline CSN is %s) </span><span class="cx"> \
SEVERE_ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE_279=Could not read replica \
offline \ </span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationpluginLDAPReplicationDomainjava"></a>
 <div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -2012,6 +2012,13 @@
</span><span class="cx">     addOperation.setAttachment(SYNCHROCONTEXT, ctx);
</span><span class="cx">   }
</span><span class="cx"> 
</span><ins>+  /** {@inheritDoc} */
+  @Override
+  public void publishReplicaOfflineMsg()
+  {
+    pendingChanges.putReplicaOfflineMsg();
+  }
+
</ins><span class="cx">   /**
</span><span class="cx">    * Check if an operation must be synchronized.
</span><span class="cx">    * Also update the list of pending changes and the server \
RUV </span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationpluginPendingChangejava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/plugin/PendingChange.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/plugin/PendingChange.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/plugin/PendingChange.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -29,6 +29,7 @@
</span><span class="cx"> import org.opends.server.replication.common.CSN;
</span><span class="cx"> import org.opends.server.replication.common.ServerState;
</span><span class="cx"> import org.opends.server.replication.protocol.LDAPUpdateMsg;
</span><ins>+import org.opends.server.replication.protocol.UpdateMsg;
</ins><span class="cx"> import org.opends.server.types.operation.PluginOperation;
</span><span class="cx"> 
</span><span class="cx"> /**
</span><span class="lines">@@ -39,7 +40,7 @@
</span><span class="cx"> {
</span><span class="cx">   private final CSN csn;
</span><span class="cx">   private boolean committed;
</span><del>-  private LDAPUpdateMsg msg;
</del><ins>+  private UpdateMsg msg;
</ins><span class="cx">   private final PluginOperation op;
</span><span class="cx">   private ServerState dependencyState;
</span><span class="cx"> 
</span><span class="lines">@@ -49,7 +50,7 @@
</span><span class="cx">    * @param op the operation to use
</span><span class="cx">    * @param msg the message to use (can be null for local \
operations) </span><span class="cx">    */
</span><del>-  PendingChange(CSN csn, PluginOperation op, LDAPUpdateMsg msg)
</del><ins>+  PendingChange(CSN csn, PluginOperation op, UpdateMsg msg)
</ins><span class="cx">   {
</span><span class="cx">     this.csn = csn;
</span><span class="cx">     this.committed = false;
</span><span class="lines">@@ -89,12 +90,27 @@
</span><span class="cx">    * @return the message if operation was a replication \
operation </span><span class="cx">    * null if the operation was a local operation
</span><span class="cx">    */
</span><del>-  public LDAPUpdateMsg getMsg()
</del><ins>+  public UpdateMsg getMsg()
</ins><span class="cx">   {
</span><span class="cx">     return msg;
</span><span class="cx">   }
</span><span class="cx"> 
</span><span class="cx">   /**
</span><ins>+   * Get the LDAPUpdateMsg associated to this PendingChange.
+   *
+   * @return the LDAPUpdateMsg if operation was a replication operation, null
+   *         otherwise
+   */
+  public LDAPUpdateMsg getLDAPUpdateMsg()
+  {
+    if (msg instanceof LDAPUpdateMsg)
+    {
+      return (LDAPUpdateMsg) msg;
+    }
+    return null;
+  }
+
+  /**
</ins><span class="cx">    * Set the message associated to the PendingChange.
</span><span class="cx">    * @param msg the message
</span><span class="cx">    */
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationpluginPendingChangesjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -33,6 +33,8 @@
</span><span class="cx"> import org.opends.server.replication.common.CSN;
</span><span class="cx"> import org.opends.server.replication.common.CSNGenerator;
</span><span class="cx"> import org.opends.server.replication.protocol.LDAPUpdateMsg;
</span><ins>+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
</ins><span class="cx"> import \
org.opends.server.replication.service.ReplicationDomain; </span><span class="cx"> \
import org.opends.server.types.operation.PluginOperation; </span><span class="cx"> 
</span><span class="lines">@@ -136,6 +138,20 @@
</span><span class="cx">   }
</span><span class="cx"> 
</span><span class="cx">   /**
</span><ins>+   * Add a replica offline message to the pending list.
+   */
+  public synchronized void putReplicaOfflineMsg()
+  {
+    final CSN offlineCSN = csnGenerator.newCSN();
+    final PendingChange pendingChange =
+        new PendingChange(offlineCSN, null, new ReplicaOfflineMsg(offlineCSN));
+    pendingChange.setCommitted(true);
+
+    pendingChanges.put(offlineCSN, pendingChange);
+    pushCommittedChanges();
+  }
+
+  /**
</ins><span class="cx">    * Push all committed local changes to the \
replicationServer service. </span><span class="cx">    */
</span><span class="cx">   synchronized void pushCommittedChanges()
</span><span class="lines">@@ -152,20 +168,26 @@
</span><span class="cx">     while (firstChange != null &amp;&amp; \
firstChange.isCommitted()) </span><span class="cx">     {
</span><span class="cx">       final PluginOperation op = firstChange.getOp();
</span><del>-      if (op != null &amp;&amp; !op.isSynchronizationOperation())
</del><ins>+      final UpdateMsg msg = firstChange.getMsg();
+      if (msg instanceof LDAPUpdateMsg
+          &amp;&amp; op != null
+          &amp;&amp; !op.isSynchronizationOperation())
</ins><span class="cx">       {
</span><del>-        final LDAPUpdateMsg updateMsg = firstChange.getMsg();
</del><span class="cx">         if (!recoveringOldChanges)
</span><span class="cx">         {
</span><del>-          domain.publish(updateMsg);
</del><ins>+          domain.publish(msg);
</ins><span class="cx">         }
</span><span class="cx">         else
</span><span class="cx">         {
</span><span class="cx">           // do not push updates until the RS catches up.
</span><span class="cx">           // @see #setRecovering(boolean)
</span><del>-          domain.getServerState().update(updateMsg.getCSN());
</del><ins>+          domain.getServerState().update(msg.getCSN());
</ins><span class="cx">         }
</span><span class="cx">       }
</span><ins>+      else if (msg instanceof ReplicaOfflineMsg)
+      {
+        domain.publish(msg);
+      }
</ins><span class="cx"> 
</span><span class="cx">       // false warning: firstEntry will not be null if \
firstChange is not null </span><span class="cx">       \
pendingChanges.remove(firstEntry.getKey()); </span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationpluginRemotePendingChangesjava"></a>
 <div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -157,7 +157,7 @@
</span><span class="cx">       if (change.dependenciesIsCovered(state))
</span><span class="cx">       {
</span><span class="cx">         dependentChanges.remove(change);
</span><del>-        return change.getMsg();
</del><ins>+        return change.getLDAPUpdateMsg();
</ins><span class="cx">       }
</span><span class="cx">     }
</span><span class="cx">     return null;
</span><span class="lines">@@ -208,7 +208,7 @@
</span><span class="cx">     {
</span><span class="cx">       if (pendingChange.getCSN().isOlderThan(csn))
</span><span class="cx">       {
</span><del>-        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
</del><ins>+        final LDAPUpdateMsg pendingMsg = \
pendingChange.getLDAPUpdateMsg(); </ins><span class="cx">         if (pendingMsg != \
null) </span><span class="cx">         {
</span><span class="cx">           if (pendingMsg instanceof DeleteMsg)
</span><span class="lines">@@ -300,7 +300,7 @@
</span><span class="cx">     {
</span><span class="cx">       if (pendingChange.getCSN().isOlderThan(csn))
</span><span class="cx">       {
</span><del>-        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
</del><ins>+        final LDAPUpdateMsg pendingMsg = \
pendingChange.getLDAPUpdateMsg(); </ins><span class="cx">         if (pendingMsg \
instanceof AddMsg) </span><span class="cx">         {
</span><span class="cx">           // Check if the operation to be run is an \
addOperation on a same DN. </span><span class="lines">@@ -350,13 +350,13 @@
</span><span class="cx">       return false;
</span><span class="cx">     }
</span><span class="cx"> 
</span><del>-    final DN targetDN = change.getMsg().getDN();
</del><ins>+    final DN targetDN = change.getLDAPUpdateMsg().getDN();
</ins><span class="cx"> 
</span><span class="cx">     for (PendingChange pendingChange : \
pendingChanges.values()) </span><span class="cx">     {
</span><span class="cx">       if (pendingChange.getCSN().isOlderThan(csn))
</span><span class="cx">       {
</span><del>-        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
</del><ins>+        final LDAPUpdateMsg pendingMsg = \
pendingChange.getLDAPUpdateMsg(); </ins><span class="cx">         if (pendingMsg != \
null) </span><span class="cx">         {
</span><span class="cx">           if (pendingMsg instanceof DeleteMsg)
</span><span class="lines">@@ -442,7 +442,7 @@
</span><span class="cx">     {
</span><span class="cx">       if (pendingChange.getCSN().isOlderThan(csn))
</span><span class="cx">       {
</span><del>-        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
</del><ins>+        final LDAPUpdateMsg pendingMsg = \
pendingChange.getLDAPUpdateMsg(); </ins><span class="cx">         if (pendingMsg != \
null) </span><span class="cx">         {
</span><span class="cx">           if (pendingMsg instanceof DeleteMsg)
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationprotocolProtocolVersionjava"></a>
 <div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -99,7 +99,7 @@
</span><span class="cx">   /**
</span><span class="cx">    * The constant for the 8th version of the replication \
protocol. </span><span class="cx">    * &lt;ul&gt;
</span><del>-   * &lt;li&gt;StopMsg now has a timestamp to communicate the replica \
stop time.&lt;/li&gt; </del><ins>+   * &lt;li&gt;New ReplicaOfflineMsg.&lt;/li&gt;
</ins><span class="cx">    * &lt;/ul&gt;
</span><span class="cx">    */
</span><span class="cx">   public static final short REPLICATION_PROTOCOL_V8 = 8;
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationprotocolReplicaOfflineMsgjava"></a>
 <div class="addfile"><h4>Added: \
trunk/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java \
(0 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java	 \
                (rev 0)
+++ trunk/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -0,0 +1,113 @@
</span><ins>+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the &quot;License&quot;).  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets &quot;[]&quot; replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.protocol;
+
+import java.util.zip.DataFormatException;
+
+import org.opends.server.replication.common.CSN;
+
+import static org.opends.server.replication.protocol.ByteArrayBuilder.*;
+
+/**
+ * Class that define messages sent by a replica (DS) to the replication server
+ * (RS) to let the RS know the date at which a replica went offline.
+ */
+public class ReplicaOfflineMsg extends UpdateMsg
+{
+
+  /**
+   * Constructor of a replica offline message providing the offline timestamp in
+   * a CSN.
+   *
+   * @param offlineCSN
+   *          the provided offline CSN
+   */
+  public ReplicaOfflineMsg(final CSN offlineCSN)
+  {
+    super(offlineCSN, new byte[0]);
+  }
+
+  /**
+   * Creates a message by deserializing it from the provided byte array.
+   *
+   * @param in
+   *          The provided byte array.
+   * @throws DataFormatException
+   *           When an error occurs during decoding .
+   */
+  public ReplicaOfflineMsg(byte[] in) throws DataFormatException
+  {
+    try
+    {
+      final ByteArrayScanner scanner = new ByteArrayScanner(in);
+      final byte msgType = scanner.nextByte();
+      if (msgType != MSG_TYPE_REPLICA_OFFLINE)
+      {
+        throw new DataFormatException(&quot;input is not a valid &quot;
+            + getClass().getSimpleName() + &quot; message: &quot; + msgType);
+      }
+      protocolVersion = scanner.nextShort();
+      csn = scanner.nextCSN();
+
+      if (!scanner.isEmpty())
+      {
+        throw new DataFormatException(
+            &quot;Did not expect to find more bytes to read for &quot;
+                + getClass().getSimpleName());
+      }
+    }
+    catch (RuntimeException e)
+    {
+      // Index out of bounds, bad format, etc.
+      throw new DataFormatException(&quot;byte[] is not a valid &quot;
+          + getClass().getSimpleName());
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public byte[] getBytes(short protocolVersion)
+  {
+    final ByteArrayBuilder builder = new ByteArrayBuilder(size());
+    builder.appendByte(MSG_TYPE_REPLICA_OFFLINE);
+    builder.appendShort(protocolVersion);
+    builder.appendCSN(csn);
+    return builder.toByteArray();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int size()
+  {
+    return bytes(1) + shorts(1) + csns(1);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + &quot; offlineCSN=&quot; + csn;
+  }
+}
</ins><span class="cx">Property changes on: \
trunk/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java \
</span><span class="cx">___________________________________________________________________
 </span></span></pre></div>
<a id="svneolstyle"></a>
<div class="addfile"><h4>Added: svn:eol-style</h4></div>
<a id="trunkopendssrcserverorgopendsserverreplicationprotocolReplicationMsgjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -85,6 +85,9 @@
</span><span class="cx">   //   EntryMsg, InitializeRequestMsg, InitializeTargetMsg, \
ErrorMsg </span><span class="cx">   //   TopologyMsg
</span><span class="cx"> 
</span><ins>+  /** @since {@link ProtocolVersion#REPLICATION_PROTOCOL_V8} */
+  static final byte MSG_TYPE_REPLICA_OFFLINE = 37;
+
</ins><span class="cx">   // Adding a new type of message here probably requires to
</span><span class="cx">   // change accordingly generateMsg method below
</span><span class="cx"> 
</span><span class="lines">@@ -199,6 +202,8 @@
</span><span class="cx">       return new StopMsg(buffer);
</span><span class="cx">     case MSG_TYPE_INITIALIZE_RCV_ACK:
</span><span class="cx">       return new InitializeRcvAckMsg(buffer);
</span><ins>+    case MSG_TYPE_REPLICA_OFFLINE:
+      return new ReplicaOfflineMsg(buffer);
</ins><span class="cx">     default:
</span><span class="cx">       throw new DataFormatException(&quot;received message \
with unknown type&quot;); </span><span class="cx">     }
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserverECLServerHandlerjava"></a>
<div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -223,7 +223,13 @@
</span><span class="cx">       {
</span><span class="cx">         final UpdateMsg newMsg = mh.getNextMessage(false /* \
non blocking */); </span><span class="cx"> 
</span><del>-        if (newMsg == null)
</del><ins>+        if (newMsg instanceof ReplicaOfflineMsg)
+        {
+          // and ReplicaOfflineMsg cannot be returned to a search on cn=changelog
+          // proceed as if it was never returned
+          continue;
+        }
+        else if (newMsg == null)
</ins><span class="cx">         { // in non blocking mode, null means no more \
messages </span><span class="cx">           return null;
</span><span class="cx">         }
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserverReplicationServerDomainjava"></a>
 <div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -496,6 +496,13 @@
</span><span class="cx">   {
</span><span class="cx">     try
</span><span class="cx">     {
</span><ins>+      if (updateMsg instanceof ReplicaOfflineMsg)
+      {
+        final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg;
+        this.domainDB.notifyReplicaOffline(baseDN, offlineMsg.getCSN());
+        return true;
+      }
+
</ins><span class="cx">       if (this.domainDB.publishUpdateMsg(baseDN, updateMsg))
</span><span class="cx">       {
</span><span class="cx">         /*
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserverchangelogfileFileChangelogDBjava"></a>
 <div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -45,6 +45,7 @@
</span><span class="cx"> import org.opends.server.replication.server.changelog.api.*;
</span><span class="cx"> import \
org.opends.server.replication.server.changelog.je.ChangeNumberIndexer; </span><span \
class="cx"> import org.opends.server.replication.server.changelog.je.CompositeDBCursor;
 </span><ins>+import \
org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor; </ins><span \
class="cx"> import org.opends.server.types.DN; </span><span class="cx"> import \
org.opends.server.types.DebugLogLevel; </span><span class="cx"> import \
org.opends.server.util.StaticUtils; </span><span class="lines">@@ -388,7 +389,7 @@
</span><span class="cx">   {
</span><span class="cx">     if (debugEnabled())
</span><span class="cx">     {
</span><del>-      TRACER.debugError(&quot;clear the FileChangelogDB&quot;);
</del><ins>+      TRACER.debugInfo(&quot;clear the FileChangelogDB&quot;);
</ins><span class="cx">     }
</span><span class="cx">     if (!dbDirectory.exists())
</span><span class="cx">     {
</span><span class="lines">@@ -631,18 +632,39 @@
</span><span class="cx">       throws ChangelogException
</span><span class="cx">   {
</span><span class="cx">     final Set&lt;Integer&gt; serverIds = \
getDomainMap(baseDN).keySet(); </span><ins>+    final ChangelogState state = \
replicationEnv.readChangelogState(); </ins><span class="cx">     final \
Map&lt;DBCursor&lt;UpdateMsg&gt;, Void&gt; cursors = new \
HashMap&lt;DBCursor&lt;UpdateMsg&gt;, Void&gt;(serverIds.size()); </span><span \
class="cx">     for (int serverId : serverIds) </span><span class="cx">     {
</span><span class="cx">       // get the last already sent CSN from that server to \
get a cursor </span><span class="cx">       final CSN lastCSN = startAfterServerState \
!= null ? startAfterServerState.getCSN(serverId) : null; </span><del>-      \
cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); </del><ins>+      final \
DBCursor&lt;UpdateMsg&gt; replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); \
+      final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, \
startAfterServerState); +      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, \
offlineCSN), null); </ins><span class="cx">     }
</span><span class="cx">     // recycle exhausted cursors,
</span><span class="cx">     // because client code will not manage the cursors \
itself </span><span class="cx">     return new CompositeDBCursor&lt;Void&gt;(cursors, \
true); </span><span class="cx">   }
</span><span class="cx"> 
</span><ins>+  private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int \
serverId, +      ServerState startAfterServerState)
+  {
+    final List&lt;CSN&gt; domain = state.getOfflineReplicas().get(baseDN);
+    if (domain != null)
+    {
+      for (CSN offlineCSN : domain)
+      {
+        if (serverId == offlineCSN.getServerId()
+            &amp;&amp; !startAfterServerState.cover(offlineCSN))
+        {
+          return offlineCSN;
+        }
+      }
+    }
+    return null;
+  }
+
</ins><span class="cx">   /** {@inheritDoc} */
</span><span class="cx">   @Override
</span><span class="cx">   public DBCursor&lt;UpdateMsg&gt; getCursorFrom(final DN \
baseDN, final int serverId, final CSN startAfterCSN) </span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserverchangelogfileReplicationEnvironmentjava"></a>
 <div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -334,9 +334,13 @@
</span><span class="cx">    */
</span><span class="cx">   void clearGenerationId(final DN domainDN) throws \
ChangelogException </span><span class="cx">   {
</span><del>-    synchronized(domainLock)
</del><ins>+    synchronized (domainLock)
</ins><span class="cx">     {
</span><span class="cx">       final String domainId = domains.get(domainDN);
</span><ins>+      if (domainId == null)
+      {
+        return; // unknow domain =&gt; no-op
+      }
</ins><span class="cx">       final File idFile = \
retrieveGenerationIdFile(getDomainPath(domainId)); </span><span class="cx">       if \
(idFile != null) </span><span class="cx">       {
</span><span class="lines">@@ -365,6 +369,10 @@
</span><span class="cx">     {
</span><span class="cx">       clearGenerationId(baseDN);
</span><span class="cx">       final String domainId = domains.get(baseDN);
</span><ins>+      if (domainId == null)
+      {
+        return; // unknow domain =&gt; no-op
+      }
</ins><span class="cx">       final File generationIdPath = \
getGenerationIdPath(domainId, NO_GENERATION_ID); </span><span class="cx">       \
ensureGenerationIdFileExists(generationIdPath); </span><span class="cx">     }
</span><span class="lines">@@ -386,11 +394,14 @@
</span><span class="cx">     synchronized (domainLock)
</span><span class="cx">     {
</span><span class="cx">       final String domainId = domains.get(domainDN);
</span><ins>+      if (domainId == null)
+      {
+        return; // unknow domain =&gt; no-op
+      }
</ins><span class="cx">       final File serverIdPath = getServerIdPath(domainId, \
offlineCSN.getServerId()); </span><span class="cx">       if (!serverIdPath.exists())
</span><span class="cx">       {
</span><del>-        throw new \
                ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH.get(
                
-            domainDN.toString(), offlineCSN.getServerId(), serverIdPath.getPath()));
</del><ins>+        return; // no serverId anymore =&gt; no-op
</ins><span class="cx">       }
</span><span class="cx">       final File offlineFile = new File(serverIdPath, \
REPLICA_OFFLINE_STATE_FILENAME); </span><span class="cx">       Writer writer = null;
</span><span class="lines">@@ -428,6 +439,10 @@
</span><span class="cx">     synchronized (domainLock)
</span><span class="cx">     {
</span><span class="cx">       final String domainId = domains.get(domainDN);
</span><ins>+      if (domainId == null)
+      {
+        return; // unknow domain =&gt; no-op
+      }
</ins><span class="cx">       final File offlineFile = new \
File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME); \
</span><span class="cx">       if (offlineFile.exists()) </span><span class="cx">     \
{ </span><span class="lines">@@ -512,14 +527,12 @@
</span><span class="cx">       throws ChangelogException
</span><span class="cx">   {
</span><span class="cx">     final File domainDirectory = \
getDomainPath(domainEntry.getValue()); </span><ins>+    final DN domainDN = \
domainEntry.getKey(); </ins><span class="cx">     final String generationId = \
retrieveGenerationId(domainDirectory); </span><del>-    if (generationId == null)
</del><ins>+    if (generationId != null)
</ins><span class="cx">     {
</span><del>-      throw new \
                ChangelogException(ERR_CHANGELOG_READ_STATE_NO_GENERATION_ID_FOUND.get(
                
-          replicationRootPath, domainDirectory.getPath()));
</del><ins>+      state.setDomainGenerationId(domainDN, \
toGenerationId(generationId)); </ins><span class="cx">     }
</span><del>-    final DN domainDN = domainEntry.getKey();
-    state.setDomainGenerationId(domainDN, toGenerationId(generationId));
</del><span class="cx"> 
</span><span class="cx">     final File[] serverIds = \
domainDirectory.listFiles(SERVER_ID_FILE_FILTER); </span><span class="cx">     if \
(serverIds == null) </span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserverchangelogjeJEChangelogDBjava"></a>
 <div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -707,18 +707,39 @@
</span><span class="cx">       throws ChangelogException
</span><span class="cx">   {
</span><span class="cx">     final Set&lt;Integer&gt; serverIds = \
getDomainMap(baseDN).keySet(); </span><ins>+    final ChangelogState state = \
dbEnv.readChangelogState(); </ins><span class="cx">     final \
Map&lt;DBCursor&lt;UpdateMsg&gt;, Void&gt; cursors = new \
HashMap&lt;DBCursor&lt;UpdateMsg&gt;, Void&gt;(serverIds.size()); </span><span \
class="cx">     for (int serverId : serverIds) </span><span class="cx">     {
</span><span class="cx">       // get the last already sent CSN from that server to \
get a cursor </span><span class="cx">       final CSN lastCSN = startAfterServerState \
!= null ? startAfterServerState.getCSN(serverId) : null; </span><del>-      \
cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); </del><ins>+      final \
DBCursor&lt;UpdateMsg&gt; replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); \
+      final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, \
startAfterServerState); +      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, \
offlineCSN), null); </ins><span class="cx">     }
</span><span class="cx">     // recycle exhausted cursors,
</span><span class="cx">     // because client code will not manage the cursors \
itself </span><span class="cx">     return new CompositeDBCursor&lt;Void&gt;(cursors, \
true); </span><span class="cx">   }
</span><span class="cx"> 
</span><ins>+  private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int \
serverId, +      ServerState startAfterServerState)
+  {
+    final List&lt;CSN&gt; domain = state.getOfflineReplicas().get(baseDN);
+    if (domain != null)
+    {
+      for (CSN offlineCSN : domain)
+      {
+        if (serverId == offlineCSN.getServerId()
+            &amp;&amp; !startAfterServerState.cover(offlineCSN))
+        {
+          return offlineCSN;
+        }
+      }
+    }
+    return null;
+  }
+
</ins><span class="cx">   /** {@inheritDoc} */
</span><span class="cx">   @Override
</span><span class="cx">   public DBCursor&lt;UpdateMsg&gt; getCursorFrom(final DN \
baseDN, final int serverId, final CSN startAfterCSN) </span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserverchangelogjeReplicaOfflineCursorjava"></a>
 <div class="addfile"><h4>Added: \
trunk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java \
(0 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java	 \
                (rev 0)
+++ trunk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -0,0 +1,126 @@
</span><ins>+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the &quot;License&quot;).  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets &quot;[]&quot; replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+
+/**
+ * Implementation of a DBCursor that decorates an existing DBCursor
+ * and returns a ReplicaOfflineMsg when the decorated DBCursor is exhausted
+ * and the offline CSN is newer than the last returned update CSN.
+ */
+public class ReplicaOfflineCursor implements DBCursor&lt;UpdateMsg&gt;
+{
+  /** @NonNull */
+  private final DBCursor&lt;UpdateMsg&gt; cursor;
+  private ReplicaOfflineMsg replicaOfflineMsg;
+  /**
+   * Whether calls to {@link #getRecord()} must return the {@link ReplicaOfflineMsg}
+   */
+  private boolean returnReplicaOfflineMsg;
+
+  /**
+   * Creates a ReplicaOfflineCursor object with a cursor to decorate
+   * and an offlineCSN to return as part of a ReplicaOfflineMsg.
+   *
+   * @param cursor
+   *          the non-null underlying cursor that needs to be exhausted before
+   *          we return a ReplicaOfflineMsg
+   * @param offlineCSN
+   *          The offline CSN from which to builder the
+   *          {@link ReplicaOfflineMsg} to return
+   */
+  public ReplicaOfflineCursor(DBCursor&lt;UpdateMsg&gt; cursor, CSN offlineCSN)
+  {
+    this.replicaOfflineMsg =
+        offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null;
+    this.cursor = cursor;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public UpdateMsg getRecord()
+  {
+    return returnReplicaOfflineMsg ? replicaOfflineMsg : cursor.getRecord();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean next() throws ChangelogException
+  {
+    if (returnReplicaOfflineMsg)
+    {
+      // already consumed, never return it again...
+      replicaOfflineMsg = null;
+      returnReplicaOfflineMsg = false;
+      // ...and verify if new changes have been added to the DB
+      // (cursors are automatically restarted)
+    }
+    final UpdateMsg lastUpdate = cursor.getRecord();
+    final boolean hasNext = cursor.next();
+    if (hasNext)
+    {
+      return true;
+    }
+    if (replicaOfflineMsg == null)
+    { // no ReplicaOfflineMsg to return
+      return false;
+    }
+
+    // replicaDB just happened to be exhausted now
+    if (lastUpdate != null
+        &amp;&amp; replicaOfflineMsg.getCSN().isOlderThanOrEqualTo(lastUpdate.getCSN()))
 +    {
+      // offlineCSN is outdated, never return it
+      replicaOfflineMsg = null;
+      return false;
+    }
+    returnReplicaOfflineMsg = true;
+    return true;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    cursor.close();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName()
+        + &quot; returnReplicaOfflineMsg=&quot; + returnReplicaOfflineMsg
+        + &quot; offlineCSN=&quot;
+        + (replicaOfflineMsg != null ? replicaOfflineMsg.getCSN().toStringUI() : \
null) +        + &quot; cursor=&quot; + cursor.toString().split(&quot;&quot;, 2)[1];
+  }
+
+}
</ins><span class="cx">\ No newline at end of file
</span><span class="cx">Property changes on: \
trunk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
 </span><span class="cx">___________________________________________________________________
 </span></span></pre></div>
<a id="svneolstyle"></a>
<div class="addfile"><h4>Added: svn:eol-style</h4></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserviceReplicationBrokerjava"></a>
 <div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -2802,6 +2802,7 @@
</span><span class="cx"> 
</span><span class="cx">     synchronized (startStopLock)
</span><span class="cx">     {
</span><ins>+      domain.publishReplicaOfflineMsg();
</ins><span class="cx">       shutdown = true;
</span><span class="cx">       setConnectedRS(ConnectedRS.stopped());
</span><span class="cx">       stopRSHeartBeatMonitoring();
</span></span></pre></div>
<a id="trunkopendssrcserverorgopendsserverreplicationserviceReplicationDomainjava"></a>
 <div class="modfile"><h4>Modified: \
trunk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -3450,6 +3450,15 @@
</span><span class="cx">   }
</span><span class="cx"> 
</span><span class="cx">   /**
</span><ins>+   * Publishes a replica offline message if all pending changes for \
current +   * replica have been sent out.
+   */
+  public void publishReplicaOfflineMsg()
+  {
+    // Here to be overridden
+  }
+
+  /**
</ins><span class="cx">    * Publish information to the Replication Service (not \
assured mode). </span><span class="cx">    *
</span><span class="cx">    * @param msg  The byte array containing the information \
that should </span></span></pre></div>
<a id="trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationprotocolSynchronizationMsgTestjava"></a>
 <div class="modfile"><h4>Modified: \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -843,6 +843,17 @@
</span><span class="cx">     assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN());
</span><span class="cx">   }
</span><span class="cx"> 
</span><ins>+  @Test
+  public void replicaOfflineMsgTest() throws Exception
+  {
+    final CSN csn = new CSN(System.currentTimeMillis(), 0, 42);
+    final ReplicaOfflineMsg expectedMsg = new ReplicaOfflineMsg(csn);
+
+    final byte[] bytes = expectedMsg.getBytes(REPLICATION_PROTOCOL_V8);
+    ReplicaOfflineMsg decodedMsg = new ReplicaOfflineMsg(bytes);
+    assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN());
+  }
+
</ins><span class="cx">   /**
</span><span class="cx">    * Test that WindowMsg encoding and decoding works
</span><span class="cx">    * by checking that : msg == new \
WindowMsg(msg.getBytes()). </span></span></pre></div>
<a id="trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationserverchangelogjeCompositeDBCursorTestjava"></a>
 <div class="modfile"><h4>Modified: \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java \
(10839 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java	2014-06-25 \
                15:43:38 UTC (rev 10839)
+++ trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -29,7 +29,6 @@
</span><span class="cx"> import java.util.Map;
</span><span class="cx"> 
</span><span class="cx"> import org.opends.server.DirectoryServerTestCase;
</span><del>-import org.opends.server.replication.common.CSN;
</del><span class="cx"> import org.opends.server.replication.protocol.UpdateMsg;
</span><span class="cx"> import \
org.opends.server.replication.server.changelog.api.ChangelogException; </span><span \
class="cx"> import org.opends.server.replication.server.changelog.api.DBCursor; \
</span><span class="lines">@@ -56,10 +55,10 @@ </span><span class="cx">   \
@BeforeClass </span><span class="cx">   public void setupMsgs()
</span><span class="cx">   {
</span><del>-    msg1 = newUpdateMsg(1);
-    msg2 = newUpdateMsg(2);
-    msg3 = newUpdateMsg(3);
-    msg4 = newUpdateMsg(4);
</del><ins>+    msg1 = new FakeUpdateMsg(1);
+    msg2 = new FakeUpdateMsg(2);
+    msg3 = new FakeUpdateMsg(3);
+    msg4 = new FakeUpdateMsg(4);
</ins><span class="cx">   }
</span><span class="cx"> 
</span><span class="cx">   @Test
</span><span class="lines">@@ -134,19 +133,6 @@
</span><span class="cx">         of(msg4, baseDN1));
</span><span class="cx">   }
</span><span class="cx"> 
</span><del>-  private UpdateMsg newUpdateMsg(final int t)
-  {
-    return new UpdateMsg(new CSN(t, t, t), new byte[t])
-    {
-      /** {@inheritDoc} */
-      @Override
-      public String toString()
-      {
-        return &quot;UpdateMsg(&quot; + t + &quot;)&quot;;
-      }
-    };
-  }
-
</del><span class="cx">   private CompositeDBCursor&lt;String&gt; \
newCompositeDBCursor( </span><span class="cx">       Pair&lt;? extends \
DBCursor&lt;UpdateMsg&gt;, String&gt;... pairs) throws Exception </span><span \
class="cx">   { </span></span></pre></div>
<a id="trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationserverchangelogjeFakeUpdateMsgjava"></a>
 <div class="addfile"><h4>Added: \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java \
(0 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java	 \
                (rev 0)
+++ trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -0,0 +1,47 @@
</span><ins>+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the &quot;License&quot;).  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets &quot;[]&quot; replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+
+@SuppressWarnings(&quot;javadoc&quot;)
+class FakeUpdateMsg extends UpdateMsg
+{
+  private final int t;
+
+  FakeUpdateMsg(int t)
+  {
+    super(new CSN(t, t, t), new byte[1]);
+    this.t = t;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return &quot;UpdateMsg(&quot; + t + &quot;)&quot;;
+  }
+}
</ins><span class="cx">\ No newline at end of file
</span><span class="cx">Property changes on: \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java
 </span><span class="cx">___________________________________________________________________
 </span></span></pre></div>
<a id="svneolstyle"></a>
<div class="addfile"><h4>Added: svn:eol-style</h4></div>
<a id="trunkopendstestsunitteststestngsrcserverorgopendsserverreplicationserverchangelogjeReplicaOfflineCursorTestjava"></a>
 <div class="addfile"><h4>Added: \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java \
(0 => 10840)</h4> <pre class="diff"><span>
<span class="info">--- \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java	 \
                (rev 0)
+++ trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java	2014-06-26 \
13:19:57 UTC (rev 10840) </span><span class="lines">@@ -0,0 +1,129 @@
</span><ins>+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the &quot;License&quot;).  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets &quot;[]&quot; replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+
+/**
+ * Test the ReplicaOfflineCursor class.
+ */
+@SuppressWarnings(&quot;javadoc&quot;)
+public class ReplicaOfflineCursorTest extends ReplicationTestCase
+{
+
+  private int timestamp;
+  private DBCursor&lt;UpdateMsg&gt; delegateCursor;
+
+  @BeforeTest
+  public void init()
+  {
+    timestamp = 1;
+  }
+
+  @Test
+  public void cursorReturnsFalse() throws Exception
+  {
+    delegateCursor = new SequentialDBCursor();
+
+    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, \
null); +    assertThat(cursor.getRecord()).isNull();
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
+  }
+
+  @Test
+  public void cursorReturnsTrue() throws Exception
+  {
+    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
+    delegateCursor = new SequentialDBCursor(null, updateMsg);
+
+    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, \
null); +    assertThat(cursor.getRecord()).isNull();
+    assertThat(cursor.next()).isTrue();
+    assertThat(cursor.getRecord()).isSameAs(updateMsg);
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
+  }
+
+  @Test
+  public void cursorReturnsReplicaOfflineMsg() throws Exception
+  {
+    delegateCursor = new SequentialDBCursor();
+
+    final CSN offlineCSN = new CSN(timestamp++, 1, 1);
+    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, \
offlineCSN); +    assertThat(cursor.getRecord()).isNull();
+    assertThat(cursor.next()).isTrue();
+    final UpdateMsg record = cursor.getRecord();
+    assertThat(record).isInstanceOf(ReplicaOfflineMsg.class);
+    assertThat(record.getCSN()).isEqualTo(offlineCSN);
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
+  }
+
+  @Test
+  public void cursorReturnsUpdateMsgThenReplicaOfflineMsg() throws Exception
+  {
+    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
+    delegateCursor = new SequentialDBCursor(null, updateMsg);
+
+    final CSN offlineCSN = new CSN(timestamp++, 1, 1);
+    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, \
offlineCSN); +    assertThat(cursor.getRecord()).isNull();
+    assertThat(cursor.next()).isTrue();
+    assertThat(cursor.getRecord()).isSameAs(updateMsg);
+    assertThat(cursor.next()).isTrue();
+    final UpdateMsg record = cursor.getRecord();
+    assertThat(record).isInstanceOf(ReplicaOfflineMsg.class);
+    assertThat(record.getCSN()).isEqualTo(offlineCSN);
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
+  }
+
+  @Test
+  public void cursorReturnsUpdateMsgThenNeverReturnsOutdatedReplicaOfflineMsg() \
throws Exception +  {
+    final CSN outdatedOfflineCSN = new CSN(timestamp++, 1, 1);
+
+    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
+    delegateCursor = new SequentialDBCursor(null, updateMsg);
+
+    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, \
outdatedOfflineCSN); +    assertThat(cursor.getRecord()).isNull();
+    assertThat(cursor.next()).isTrue();
+    assertThat(cursor.getRecord()).isSameAs(updateMsg);
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
+  }
+
+}
</ins><span class="cx">Property changes on: \
trunk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
 </span><span class="cx">___________________________________________________________________
 </span></span></pre></div>
<a id="svneolstyle"></a>
<div class="addfile"><h4>Added: svn:eol-style</h4></div>
</div>
<div id="footer">Copyright (c) by ForgeRock. All rights reserved.</div>

</body>
</html>



_______________________________________________
OpenDJ-dev mailing list
OpenDJ-dev@forgerock.org
https://lists.forgerock.org/mailman/listinfo/opendj-dev


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic