[prev in list] [next in list] [prev in thread] [next in thread] 

List:       hadoop-commits
Subject:    [hadoop] branch branch-3.2 updated: HADOOP-16459. Backport of HADOOP-16266. Add more fine-grained pr
From:       xkrogen () apache ! org
Date:       2019-07-30 21:45:21
Message-ID: 156452312088.15624.10659352110699389767 () gitbox ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

xkrogen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new d4492bd  HADOOP-16459. Backport of HADOOP-16266. Add more fine-grained \
processing time metrics to the RPC layer. Contributed by Christopher Gregorian. \
d4492bd is described below

commit d4492bdd9edec60c236aff85de50b963097e5a7f
Author: Christopher Gregorian <csgregorian@gmail.com>
AuthorDate: Mon Apr 29 15:37:25 2019 -0700

    HADOOP-16459. Backport of HADOOP-16266. Add more fine-grained processing time \
metrics to the RPC layer. Contributed by Christopher Gregorian.  
    This commit also includes the follow-on commit \
827a84778a4e3b8f165806dfd2966f0951a5e575.  
    (cherry-picked from f96a2df38d889f29314c57f4d94227b2e419a11f)
---
 .../org/apache/hadoop/ipc/CallQueueManager.java    |   5 +-
 .../org/apache/hadoop/ipc/DecayRpcScheduler.java   |  12 +-
 .../org/apache/hadoop/ipc/DefaultRpcScheduler.java |   4 +-
 .../java/org/apache/hadoop/ipc/ExternalCall.java   |   5 +
 .../org/apache/hadoop/ipc/ProcessingDetails.java   |  96 +++++++++++++
 .../org/apache/hadoop/ipc/ProtobufRpcEngine.java   |  31 +----
 .../java/org/apache/hadoop/ipc/RpcScheduler.java   |  41 +++++-
 .../main/java/org/apache/hadoop/ipc/Server.java    | 152 ++++++++++++++++-----
 .../org/apache/hadoop/ipc/WritableRpcEngine.java   |  20 +--
 .../hadoop/ipc/metrics/RpcDetailedMetrics.java     |   6 +-
 .../org/apache/hadoop/ipc/metrics/RpcMetrics.java  |  63 ++++++---
 .../hadoop-common/src/site/markdown/Metrics.md     |   9 ++
 .../apache/hadoop/ipc/TestProcessingDetails.java   |  61 +++++++++
 .../org/apache/hadoop/ipc/TestProtoBufRpc.java     |   9 +-
 .../test/java/org/apache/hadoop/ipc/TestRPC.java   |  18 ++-
 .../java/org/apache/hadoop/ipc/TestRpcBase.java    |  28 ++++
 .../src/test/proto/test_rpc_service.proto          |   1 +
 .../hdfs/server/namenode/FSNamesystemLock.java     |  66 ++++++---
 .../namenode/ha/TestConsistentReadsObserver.java   |   5 -
 19 files changed, 499 insertions(+), 133 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
 index 9731e13..e18f307 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
 @@ -193,9 +193,8 @@ public class CallQueueManager<E extends Schedulable>
     return scheduler.shouldBackOff(e);
   }
 
-  void addResponseTime(String name, int priorityLevel, int queueTime,
-      int processingTime) {
-    scheduler.addResponseTime(name, priorityLevel, queueTime, processingTime);
+  void addResponseTime(String name, Schedulable e, ProcessingDetails details) {
+    scheduler.addResponseTime(name, e, details);
   }
 
   // This should be only called once per call and cached in the call object
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
 index 8bb0ce4..3e3c1f7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
 @@ -55,6 +55,8 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
+
 /**
  * The decay RPC scheduler counts incoming requests in a map, then
  * decays the counts at a fixed time interval. The scheduler is optimized
@@ -600,14 +602,18 @@ public class DecayRpcScheduler implements RpcScheduler,
   }
 
   @Override
-  public void addResponseTime(String name, int priorityLevel, int queueTime,
-      int processingTime) {
+  public void addResponseTime(String callName, Schedulable schedulable,
+      ProcessingDetails details) {
+    int priorityLevel = schedulable.getPriorityLevel();
+    long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS);
+    long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS);
+
     responseTimeCountInCurrWindow.getAndIncrement(priorityLevel);
     responseTimeTotalInCurrWindow.getAndAdd(priorityLevel,
         queueTime+processingTime);
     if (LOG.isDebugEnabled()) {
       LOG.debug("addResponseTime for call: {}  priority: {} queueTime: {} " +
-          "processingTime: {} ", name, priorityLevel, queueTime,
+          "processingTime: {} ", callName, priorityLevel, queueTime,
           processingTime);
     }
   }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
 index 0847af7..696160e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
 @@ -35,8 +35,8 @@ public class DefaultRpcScheduler implements RpcScheduler {
   }
 
   @Override
-  public void addResponseTime(String name, int priorityLevel, int queueTime,
-      int processingTime) {
+  public void addResponseTime(String callName, Schedulable schedulable,
+      ProcessingDetails details) {
   }
 
   public DefaultRpcScheduler(int priorityLevels, String namespace,
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
 index 5cc3665..39e5534 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
 @@ -37,6 +37,11 @@ public abstract class ExternalCall<T> extends Call {
     this.action = action;
   }
 
+  @Override
+  public String getDetailedMetricsName() {
+    return "(external)";
+  }
+
   public abstract UserGroupInformation getRemoteUser();
 
   public final T get() throws InterruptedException, ExecutionException {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java
 new file mode 100644
index 0000000..5b97eec
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java
 @@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Stores the times that a call takes to be processed through each step.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Private
+public class ProcessingDetails {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ProcessingDetails.class);
+  private final TimeUnit valueTimeUnit;
+
+  /**
+   * The different stages to track the time of.
+   */
+  public enum Timing {
+    ENQUEUE,          // time for reader to insert in call queue.
+    QUEUE,            // time in the call queue.
+    HANDLER,          // handler overhead not spent in processing/response.
+    PROCESSING,       // time handler spent processing the call. always equal to
+                      // lock_free + lock_wait + lock_shared + lock_exclusive
+    LOCKFREE,         // processing with no lock.
+    LOCKWAIT,         // processing while waiting for lock.
+    LOCKSHARED,       // processing with a read lock.
+    LOCKEXCLUSIVE,    // processing with a write lock.
+    RESPONSE;         // time to encode and send response.
+  }
+
+  private long[] timings = new long[Timing.values().length];
+
+  ProcessingDetails(TimeUnit timeUnit) {
+    this.valueTimeUnit = timeUnit;
+  }
+
+  public long get(Timing type) {
+    // When using nanoTime to fetch timing information, it is possible to see
+    // time "move backward" slightly under unusual/rare circumstances. To avoid
+    // displaying a confusing number, round such timings to 0 here.
+    long ret = timings[type.ordinal()];
+    return ret < 0 ? 0 : ret;
+  }
+
+  public long get(Timing type, TimeUnit timeUnit) {
+    return timeUnit.convert(get(type), valueTimeUnit);
+  }
+
+  public void set(Timing type, long value) {
+    timings[type.ordinal()] = value;
+  }
+
+  public void set(Timing type, long value, TimeUnit timeUnit) {
+    set(type, valueTimeUnit.convert(value, timeUnit));
+  }
+
+  public void add(Timing type, long value, TimeUnit timeUnit) {
+    timings[type.ordinal()] += valueTimeUnit.convert(value, timeUnit);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(256);
+    for (Timing type : Timing.values()) {
+      if (sb.length() > 0) {
+        sb.append(" ");
+      }
+      sb.append(type.name().toLowerCase())
+          .append("Time=").append(get(type));
+    }
+    return sb.toString();
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
 index 5548566..15d068b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
 @@ -520,46 +520,29 @@ public class ProtobufRpcEngine implements RpcEngine {
         Message param = request.getValue(prototype);
 
         Message result;
-        long startTime = Time.now();
-        int qTime = (int) (startTime - receiveTime);
-        Exception exception = null;
-        boolean isDeferred = false;
+        Call currentCall = Server.getCurCall().get();
         try {
           server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
           currentCallInfo.set(new CallInfo(server, methodName));
+          currentCall.setDetailedMetricsName(methodName);
           result = service.callBlockingMethod(methodDescriptor, null, param);
           // Check if this needs to be a deferred response,
           // by checking the ThreadLocal callback being set
           if (currentCallback.get() != null) {
-            Server.getCurCall().get().deferResponse();
-            isDeferred = true;
+            currentCall.deferResponse();
             currentCallback.set(null);
             return null;
           }
         } catch (ServiceException e) {
-          exception = (Exception) e.getCause();
+          Exception exception = (Exception) e.getCause();
+          currentCall.setDetailedMetricsName(
+              exception.getClass().getSimpleName());
           throw (Exception) e.getCause();
         } catch (Exception e) {
-          exception = e;
+          currentCall.setDetailedMetricsName(e.getClass().getSimpleName());
           throw e;
         } finally {
           currentCallInfo.set(null);
-          int processingTime = (int) (Time.now() - startTime);
-          if (LOG.isDebugEnabled()) {
-            String msg =
-                "Served: " + methodName + (isDeferred ? ", deferred" : "") +
-                    ", queueTime= " + qTime +
-                    " procesingTime= " + processingTime;
-            if (exception != null) {
-              msg += " exception= " + exception.getClass().getSimpleName();
-            }
-            LOG.debug(msg);
-          }
-          String detailedMetricsName = (exception == null) ?
-              methodName :
-              exception.getClass().getSimpleName();
-          server.updateMetrics(detailedMetricsName, qTime, processingTime,
-              isDeferred);
         }
         return RpcWritable.wrap(result);
       }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
 index 95c5a13..63812f4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
 @@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ipc;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Implement this interface to be used for RPC scheduling and backoff.
  *
@@ -30,8 +32,43 @@ public interface RpcScheduler {
 
   boolean shouldBackOff(Schedulable obj);
 
-  void addResponseTime(String name, int priorityLevel, int queueTime,
-      int processingTime);
+  /**
+   * This method only exists to maintain backwards compatibility with old
+   * implementations. It will not be called by any Hadoop code, and should not
+   * be implemented by new implementations.
+   *
+   * @deprecated Use
+   * {@link #addResponseTime(String, Schedulable, ProcessingDetails)} instead.
+   */
+  @Deprecated
+  @SuppressWarnings("unused")
+  default void addResponseTime(String name, int priorityLevel, int queueTime,
+      int processingTime) {
+    throw new UnsupportedOperationException(
+        "This method is deprecated: use the other addResponseTime");
+  }
+
+  /**
+   * Store a processing time value for an RPC call into this scheduler.
+   *
+   * @param callName The name of the call.
+   * @param schedulable The schedulable representing the incoming call.
+   * @param details The details of processing time.
+   */
+  @SuppressWarnings("deprecation")
+  default void addResponseTime(String callName, Schedulable schedulable,
+      ProcessingDetails details) {
+    // For the sake of backwards compatibility with old implementations of
+    // this interface, a default implementation is supplied which uses the old
+    // method. All new implementations MUST override this interface and should
+    // NOT use the other addResponseTime method.
+    int queueTimeMs = (int)
+        details.get(ProcessingDetails.Timing.QUEUE, TimeUnit.MILLISECONDS);
+    int processingTimeMs = (int)
+        details.get(ProcessingDetails.Timing.PROCESSING, TimeUnit.MILLISECONDS);
+    addResponseTime(callName, schedulable.getPriorityLevel(),
+        queueTimeMs, processingTimeMs);
+  }
 
   void stop();
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java \
                index 9f286ab..cd6d350 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 @@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ipc;
 
+import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
 import static org.apache.hadoop.ipc.RpcConstants.AUTHORIZATION_FAILED_CALL_ID;
 import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
 import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
@@ -63,6 +64,7 @@ import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -491,7 +493,7 @@ public abstract class Server {
    * if and only if it falls above 99.7% of requests. We start this logic
    * only once we have enough sample size.
    */
-  void logSlowRpcCalls(String methodName, int processingTime) {
+  void logSlowRpcCalls(String methodName, Call call, long processingTime) {
     final int deviation = 3;
 
     // 1024 for minSampleSize just a guess -- not a number computed based on
@@ -504,27 +506,47 @@ public abstract class Server {
 
     if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
         (processingTime > threeSigma)) {
-      if(LOG.isWarnEnabled()) {
-        String client = CurCall.get().toString();
-        LOG.warn(
-            "Slow RPC : " + methodName + " took " + processingTime +
-                " milliseconds to process from client " + client);
-      }
+      LOG.warn("Slow RPC : {} took {} {} to process from client {}",
+          methodName, processingTime, RpcMetrics.TIMEUNIT, call);
       rpcMetrics.incrSlowRpc();
     }
   }
 
-  void updateMetrics(String name, int queueTime, int processingTime,
-                     boolean deferredCall) {
+  void updateMetrics(Call call, long startTime, boolean connDropped) {
+    // delta = handler + processing + response
+    long deltaNanos = Time.monotonicNowNanos() - startTime;
+    long timestampNanos = call.timestampNanos;
+
+    ProcessingDetails details = call.getProcessingDetails();
+    // queue time is the delta between when the call first arrived and when it
+    // began being serviced, minus the time it took to be put into the queue
+    details.set(Timing.QUEUE,
+        startTime - timestampNanos - details.get(Timing.ENQUEUE));
+    deltaNanos -= details.get(Timing.PROCESSING);
+    deltaNanos -= details.get(Timing.RESPONSE);
+    details.set(Timing.HANDLER, deltaNanos);
+
+    long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
     rpcMetrics.addRpcQueueTime(queueTime);
-    if (!deferredCall) {
-      rpcMetrics.addRpcProcessingTime(processingTime);
-      rpcDetailedMetrics.addProcessingTime(name, processingTime);
-      callQueue.addResponseTime(name, getPriorityLevel(), queueTime,
-          processingTime);
-      if (isLogSlowRPC()) {
-        logSlowRpcCalls(name, processingTime);
-      }
+
+    if (call.isResponseDeferred() || connDropped) {
+      // call was skipped; don't include it in processing metrics
+      return;
+    }
+
+    long processingTime =
+        details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
+    long waitTime =
+        details.get(Timing.LOCKWAIT, RpcMetrics.TIMEUNIT);
+    rpcMetrics.addRpcLockWaitTime(waitTime);
+    rpcMetrics.addRpcProcessingTime(processingTime);
+    // don't include lock wait for detailed metrics.
+    processingTime -= waitTime;
+    String name = call.getDetailedMetricsName();
+    rpcDetailedMetrics.addProcessingTime(name, processingTime);
+    callQueue.addResponseTime(name, call, details);
+    if (isLogSlowRPC()) {
+      logSlowRpcCalls(name, call, processingTime);
     }
   }
 
@@ -693,9 +715,13 @@ public abstract class Server {
   /** A generic call queued for handling. */
   public static class Call implements Schedulable,
   PrivilegedExceptionAction<Void> {
+    private final ProcessingDetails processingDetails =
+        new ProcessingDetails(TimeUnit.NANOSECONDS);
+    // the method name to use in metrics
+    private volatile String detailedMetricsName = "";
     final int callId;            // the client's call id
     final int retryCount;        // the retry count of the call
-    long timestamp;              // time received when response is null
+    long timestampNanos;         // time received when response is null
                                  // time served when response is not null
     private AtomicInteger responseWaitCount = new AtomicInteger(1);
     final RPC.RpcKind rpcKind;
@@ -732,7 +758,7 @@ public abstract class Server {
         TraceScope traceScope, CallerContext callerContext) {
       this.callId = id;
       this.retryCount = retryCount;
-      this.timestamp = Time.now();
+      this.timestampNanos = Time.monotonicNowNanos();
       this.rpcKind = kind;
       this.clientId = clientId;
       this.traceScope = traceScope;
@@ -741,6 +767,28 @@ public abstract class Server {
       this.isCallCoordinated = false;
     }
 
+    /**
+     * Indicates whether the call has been processed. Always true unless
+     * overridden.
+     *
+     * @return true
+     */
+    boolean isOpen() {
+      return true;
+    }
+
+    String getDetailedMetricsName() {
+      return detailedMetricsName;
+    }
+
+    void setDetailedMetricsName(String name) {
+      detailedMetricsName = name;
+    }
+
+    public ProcessingDetails getProcessingDetails() {
+      return processingDetails;
+    }
+
     @Override
     public String toString() {
       return "Call#" + callId + " Retry#" + retryCount;
@@ -888,6 +936,11 @@ public abstract class Server {
       this.rpcRequest = param;
     }
 
+    @Override
+    boolean isOpen() {
+      return connection.channel.isOpen();
+    }
+
     void setResponseFields(Writable returnValue,
                            ResponseParams responseParams) {
       this.rv = returnValue;
@@ -915,18 +968,33 @@ public abstract class Server {
         Server.LOG.info(Thread.currentThread().getName() + ": skipped " + this);
         return null;
       }
+
+      long startNanos = Time.monotonicNowNanos();
       Writable value = null;
       ResponseParams responseParams = new ResponseParams();
 
       try {
         value = call(
-            rpcKind, connection.protocolName, rpcRequest, timestamp);
+            rpcKind, connection.protocolName, rpcRequest, timestampNanos);
       } catch (Throwable e) {
         populateResponseParamsOnError(e, responseParams);
       }
       if (!isResponseDeferred()) {
+        long deltaNanos = Time.monotonicNowNanos() - startNanos;
+        ProcessingDetails details = getProcessingDetails();
+
+        details.set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
+        deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
+        deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
+        deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
+        details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
+        startNanos = Time.monotonicNowNanos();
+
         setResponseFields(value, responseParams);
         sendResponse();
+
+        deltaNanos = Time.monotonicNowNanos() - startNanos;
+        details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Deferring response for callId: " + this.callId);
@@ -1346,12 +1414,13 @@ public abstract class Server {
     }
   }
 
+  private final static long PURGE_INTERVAL_NANOS = TimeUnit.NANOSECONDS.convert(
+      15, TimeUnit.MINUTES);
+
   // Sends responses of RPC back to clients.
   private class Responder extends Thread {
     private final Selector writeSelector;
     private int pending;         // connections waiting to register
-    
-    final static int PURGE_INTERVAL = 900000; // 15mins
 
     Responder() throws IOException {
       this.setName("IPC Server Responder");
@@ -1377,12 +1446,13 @@ public abstract class Server {
     }
     
     private void doRunLoop() {
-      long lastPurgeTime = 0;   // last check for old calls.
+      long lastPurgeTimeNanos = 0;   // last check for old calls.
 
       while (running) {
         try {
           waitPending();     // If a channel is being registered, wait.
-          writeSelector.select(PURGE_INTERVAL);
+          writeSelector.select(
+              TimeUnit.NANOSECONDS.toMillis(PURGE_INTERVAL_NANOS));
           Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
           while (iter.hasNext()) {
             SelectionKey key = iter.next();
@@ -1404,11 +1474,11 @@ public abstract class Server {
               LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw \
exception " + e);  }
           }
-          long now = Time.now();
-          if (now < lastPurgeTime + PURGE_INTERVAL) {
+          long nowNanos = Time.monotonicNowNanos();
+          if (nowNanos < lastPurgeTimeNanos + PURGE_INTERVAL_NANOS) {
             continue;
           }
-          lastPurgeTime = now;
+          lastPurgeTimeNanos = nowNanos;
           //
           // If there were some calls that have not been sent out for a
           // long time, discard them.
@@ -1432,7 +1502,7 @@ public abstract class Server {
           }
 
           for (RpcCall call : calls) {
-            doPurge(call, now);
+            doPurge(call, nowNanos);
           }
         } catch (OutOfMemoryError e) {
           //
@@ -1483,7 +1553,7 @@ public abstract class Server {
         Iterator<RpcCall> iter = responseQueue.listIterator(0);
         while (iter.hasNext()) {
           call = iter.next();
-          if (now > call.timestamp + PURGE_INTERVAL) {
+          if (now > call.timestampNanos + PURGE_INTERVAL_NANOS) {
             closeConnection(call.connection);
             break;
           }
@@ -1547,7 +1617,7 @@ public abstract class Server {
             
             if (inHandler) {
               // set the serve time when the response has to be sent later
-              call.timestamp = Time.now();
+              call.timestampNanos = Time.monotonicNowNanos();
               
               incPending();
               try {
@@ -2731,6 +2801,9 @@ public abstract class Server {
       } else {
         callQueue.add(call);
       }
+      long deltaNanos = Time.monotonicNowNanos() - call.timestampNanos;
+      call.getProcessingDetails().set(Timing.ENQUEUE, deltaNanos,
+          TimeUnit.NANOSECONDS);
     } catch (CallQueueOverflowException cqe) {
       // If rpc scheduler indicates back off based on performance degradation
       // such as response time or rpc queue is full, we will ask the client
@@ -2757,8 +2830,16 @@ public abstract class Server {
       SERVER.set(Server.this);
       while (running) {
         TraceScope traceScope = null;
+        Call call = null;
+        long startTimeNanos = 0;
+        // True iff the connection for this call has been dropped.
+        // Set to true by default and update to false later if the connection
+        // can be succesfully read.
+        boolean connDropped = true;
+
         try {
-          final Call call = callQueue.take(); // pop the queue; maybe blocked here
+          call = callQueue.take(); // pop the queue; maybe blocked here
+          startTimeNanos = Time.monotonicNowNanos();
           if (alignmentContext != null && call.isCallCoordinated() &&
               call.getClientStateId() > alignmentContext.getLastSeenStateId()) {
             /*
@@ -2789,6 +2870,7 @@ public abstract class Server {
           // always update the current call context
           CallerContext.setCurrent(call.callerContext);
           UserGroupInformation remoteUser = call.getRemoteUser();
+          connDropped = !call.isOpen();
           if (remoteUser != null) {
             remoteUser.doAs(call);
           } else {
@@ -2811,6 +2893,14 @@ public abstract class Server {
         } finally {
           CurCall.set(null);
           IOUtils.cleanupWithLogger(LOG, traceScope);
+          if (call != null) {
+            updateMetrics(call, startTimeNanos, connDropped);
+            ProcessingDetails.LOG.debug(
+                "Served: [{}]{} name={} user={} details={}",
+                call, (call.isResponseDeferred() ? ", deferred" : ""),
+                call.getDetailedMetricsName(), call.getRemoteUser(),
+                call.getProcessingDetails());
+          }
         }
       }
       LOG.debug(Thread.currentThread().getName() + ": exiting");
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
 index 2e3b559..6a5ac5b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
 @@ -537,15 +537,15 @@ public class WritableRpcEngine implements RpcEngine {
         }
 
         // Invoke the protocol method
-        long startTime = Time.now();
-        int qTime = (int) (startTime-receivedTime);
         Exception exception = null;
+        Call currentCall = Server.getCurCall().get();
         try {
           Method method =
               protocolImpl.protocolClass.getMethod(call.getMethodName(),
               call.getParameterClasses());
           method.setAccessible(true);
           server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
+          currentCall.setDetailedMetricsName(call.getMethodName());
           Object value = 
               method.invoke(protocolImpl.protocolImpl, call.getParameters());
           if (server.verbose) log("Return: "+value);
@@ -571,20 +571,10 @@ public class WritableRpcEngine implements RpcEngine {
           exception = ioe;
           throw ioe;
         } finally {
-          int processingTime = (int) (Time.now() - startTime);
-          if (LOG.isDebugEnabled()) {
-            String msg = "Served: " + call.getMethodName() +
-                " queueTime= " + qTime + " procesingTime= " + processingTime;
-            if (exception != null) {
-              msg += " exception= " + exception.getClass().getSimpleName();
-            }
-            LOG.debug(msg);
+          if (exception != null) {
+            currentCall.setDetailedMetricsName(
+                exception.getClass().getSimpleName());
           }
-          String detailedMetricsName = (exception == null) ?
-              call.getMethodName() :
-              exception.getClass().getSimpleName();
-          server
-              .updateMetrics(detailedMetricsName, qTime, processingTime, false);
         }
       }
     }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
 index e50895b..67ae4cc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
 @@ -66,12 +66,12 @@ public class RpcDetailedMetrics {
 
   /**
    * Add an RPC processing time sample
-   * @param name  of the RPC call
+   * @param rpcCallName of the RPC call
    * @param processingTime  the processing time
    */
   //@Override // some instrumentation interface
-  public void addProcessingTime(String name, int processingTime) {
-    rates.add(name, processingTime);
+  public void addProcessingTime(String rpcCallName, long processingTime) {
+    rates.add(rpcCallName, processingTime);
   }
 
   public void addDeferredProcessingTime(String name, long processingTime) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
 index 84ca9fe..bb4bfcf 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
 @@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.ipc.metrics;
 
+import java.util.concurrent.TimeUnit;
+
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ipc.Server;
@@ -27,7 +29,6 @@ import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.lib.MutableRate;
@@ -47,6 +48,8 @@ public class RpcMetrics {
   final MetricsRegistry registry;
   final String name;
   final boolean rpcQuantileEnable;
+  /** The time unit used when storing/accessing time durations. */
+  public final static TimeUnit TIMEUNIT = TimeUnit.MILLISECONDS;
   
   RpcMetrics(Server server, Configuration conf) {
     String port = String.valueOf(server.getListenerAddress().getPort());
@@ -61,24 +64,31 @@ public class RpcMetrics {
         CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE,
         CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE_DEFAULT);
     if (rpcQuantileEnable) {
-      rpcQueueTimeMillisQuantiles =
+      rpcQueueTimeQuantiles =
+          new MutableQuantiles[intervals.length];
+      rpcLockWaitTimeQuantiles =
           new MutableQuantiles[intervals.length];
-      rpcProcessingTimeMillisQuantiles =
+      rpcProcessingTimeQuantiles =
           new MutableQuantiles[intervals.length];
-      deferredRpcProcessingTimeMillisQuantiles =
+      deferredRpcProcessingTimeQuantiles =
           new MutableQuantiles[intervals.length];
       for (int i = 0; i < intervals.length; i++) {
         int interval = intervals[i];
-        rpcQueueTimeMillisQuantiles[i] = registry.newQuantiles("rpcQueueTime"
-            + interval + "s", "rpc queue time in milli second", "ops",
+        rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime"
+            + interval + "s", "rpc queue time in " + TIMEUNIT, "ops",
+            "latency", interval);
+        rpcLockWaitTimeQuantiles[i] = registry.newQuantiles(
+            "rpcLockWaitTime" + interval + "s",
+            "rpc lock wait time in " + TIMEUNIT, "ops",
             "latency", interval);
-        rpcProcessingTimeMillisQuantiles[i] = registry.newQuantiles(
+        rpcProcessingTimeQuantiles[i] = registry.newQuantiles(
             "rpcProcessingTime" + interval + "s",
-            "rpc processing time in milli second", "ops", "latency", interval);
-        deferredRpcProcessingTimeMillisQuantiles[i] = registry
-            .newQuantiles("deferredRpcProcessingTime" + interval + "s",
-                "deferred rpc processing time in milli seconds", "ops",
-                "latency", interval);
+            "rpc processing time in " + TIMEUNIT, "ops",
+            "latency", interval);
+        deferredRpcProcessingTimeQuantiles[i] = registry.newQuantiles(
+            "deferredRpcProcessingTime" + interval + "s",
+            "deferred rpc processing time in " + TIMEUNIT, "ops",
+            "latency", interval);
       }
     }
     LOG.debug("Initialized " + registry);
@@ -94,11 +104,13 @@ public class RpcMetrics {
   @Metric("Number of received bytes") MutableCounterLong receivedBytes;
   @Metric("Number of sent bytes") MutableCounterLong sentBytes;
   @Metric("Queue time") MutableRate rpcQueueTime;
-  MutableQuantiles[] rpcQueueTimeMillisQuantiles;
+  MutableQuantiles[] rpcQueueTimeQuantiles;
+  @Metric("Lock wait time") MutableRate rpcLockWaitTime;
+  MutableQuantiles[] rpcLockWaitTimeQuantiles;
   @Metric("Processing time") MutableRate rpcProcessingTime;
-  MutableQuantiles[] rpcProcessingTimeMillisQuantiles;
+  MutableQuantiles[] rpcProcessingTimeQuantiles;
   @Metric("Deferred Processing time") MutableRate deferredRpcProcessingTime;
-  MutableQuantiles[] deferredRpcProcessingTimeMillisQuantiles;
+  MutableQuantiles[] deferredRpcProcessingTimeQuantiles;
   @Metric("Number of authentication failures")
   MutableCounterLong rpcAuthenticationFailures;
   @Metric("Number of authentication successes")
@@ -196,25 +208,32 @@ public class RpcMetrics {
    * Add an RPC queue time sample
    * @param qTime the queue time
    */
-  //@Override
-  public void addRpcQueueTime(int qTime) {
+  public void addRpcQueueTime(long qTime) {
     rpcQueueTime.add(qTime);
     if (rpcQuantileEnable) {
-      for (MutableQuantiles q : rpcQueueTimeMillisQuantiles) {
+      for (MutableQuantiles q : rpcQueueTimeQuantiles) {
         q.add(qTime);
       }
     }
   }
 
+  public void addRpcLockWaitTime(long waitTime) {
+    rpcLockWaitTime.add(waitTime);
+    if (rpcQuantileEnable) {
+      for (MutableQuantiles q : rpcLockWaitTimeQuantiles) {
+        q.add(waitTime);
+      }
+    }
+  }
+
   /**
    * Add an RPC processing time sample
    * @param processingTime the processing time
    */
-  //@Override
-  public void addRpcProcessingTime(int processingTime) {
+  public void addRpcProcessingTime(long processingTime) {
     rpcProcessingTime.add(processingTime);
     if (rpcQuantileEnable) {
-      for (MutableQuantiles q : rpcProcessingTimeMillisQuantiles) {
+      for (MutableQuantiles q : rpcProcessingTimeQuantiles) {
         q.add(processingTime);
       }
     }
@@ -223,7 +242,7 @@ public class RpcMetrics {
   public void addDeferredRpcProcessingTime(long processingTime) {
     deferredRpcProcessingTime.add(processingTime);
     if (rpcQuantileEnable) {
-      for (MutableQuantiles q : deferredRpcProcessingTimeMillisQuantiles) {
+      for (MutableQuantiles q : deferredRpcProcessingTimeQuantiles) {
         q.add(processingTime);
       }
     }
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md \
b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index \
                2351d97..e611cdb 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -71,6 +71,8 @@ Each metrics record contains tags such as Hostname and port (number \
to which ser  | `SentBytes` | Total number of sent bytes |
 | `RpcQueueTimeNumOps` | Total number of RPC calls |
 | `RpcQueueTimeAvgTime` | Average queue time in milliseconds |
+| `RpcLockWaitTimeNumOps` | Total number of RPC call (same as RpcQueueTimeNumOps) |
+| `RpcLockWaitTimeAvgTime` | Average time waiting for lock acquisition in \
milliseconds |  | `RpcProcessingTimeNumOps` | Total number of RPC calls (same to \
RpcQueueTimeNumOps) |  | `RpcProcessingAvgTime` | Average Processing time in \
milliseconds |  | `RpcAuthenticationFailures` | Total number of authentication \
failures | @@ -92,6 +94,12 @@ Each metrics record contains tags such as Hostname and \
port (number to which ser  | `rpcProcessingTime`*num*`s90thPercentileLatency` | Shows \
the 90th percentile of RPC processing time in milliseconds (*num* seconds \
granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. |  | \
`rpcProcessingTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC \
processing time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. |  | \
`rpcProcessingTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC \
processing time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | +| `rpcLockWaitTime`*num*`sNumOps` | Shows \
total number of RPC calls (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | +| \
`rpcLockWaitTime`*num*`s50thPercentileLatency` | Shows the 50th percentile of RPC \
lock wait time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | +| \
`rpcLockWaitTime`*num*`s75thPercentileLatency` | Shows the 75th percentile of RPC \
lock wait time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | +| \
`rpcLockWaitTime`*num*`s90thPercentileLatency` | Shows the 90th percentile of RPC \
lock wait time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | +| \
`rpcLockWaitTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC \
lock wait time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | +| \
`rpcLockWaitTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC \
lock wait time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. |  
 RetryCache/NameNodeRetryCache
 -----------------------------
@@ -118,6 +126,7 @@ rpcdetailed context
 ===================
 
 Metrics of rpcdetailed context are exposed in unified manner by RPC layer. Two \
metrics are exposed for each RPC based on its name. Metrics named "(RPC method \
name)NumOps" indicates total number of method calls, and metrics named "(RPC method \
name)AvgTime" shows average turn around time for method calls in milliseconds. \
+Please note that the AvgTime metrics do not include time spent waiting to acquire \
locks on data structures (see RpcLockWaitTimeAvgTime).  
 rpcdetailed
 -----------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProcessingDetails.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProcessingDetails.java
 new file mode 100644
index 0000000..0ecc741
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProcessingDetails.java
 @@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for ProcessingDetails time unit conversion and output.
+ */
+public class TestProcessingDetails {
+
+  /**
+   * Test that the conversion of time values in various units in and out of the
+   * details are done properly.
+   */
+  @Test
+  public void testTimeConversion() {
+    ProcessingDetails details = new ProcessingDetails(TimeUnit.MICROSECONDS);
+
+    details.set(Timing.ENQUEUE, 10);
+    assertEquals(10, details.get(Timing.ENQUEUE));
+    assertEquals(10_000, details.get(Timing.ENQUEUE, TimeUnit.NANOSECONDS));
+
+    details.set(Timing.QUEUE, 20, TimeUnit.MILLISECONDS);
+    details.add(Timing.QUEUE, 20, TimeUnit.MICROSECONDS);
+    assertEquals(20_020, details.get(Timing.QUEUE));
+    assertEquals(0, details.get(Timing.QUEUE, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testToString() {
+    ProcessingDetails details = new ProcessingDetails(TimeUnit.MICROSECONDS);
+    details.set(Timing.ENQUEUE, 10);
+    details.set(Timing.QUEUE, 20, TimeUnit.MILLISECONDS);
+
+    assertEquals("enqueueTime=10 queueTime=20000 handlerTime=0 " +
+        "processingTime=0 lockfreeTime=0 lockwaitTime=0 locksharedTime=0 " +
+        "lockexclusiveTime=0 responseTime=0", details.toString());
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
 index 5fbd957..fd6a7ae 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
                
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
 @@ -34,6 +34,7 @@ import \
org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto  import \
org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;  import \
org.apache.hadoop.metrics2.MetricsRecordBuilder;  import \
org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -41,6 +42,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@@ -215,7 +217,8 @@ public class TestProtoBufRpc extends TestRpcBase {
   }
 
   @Test(timeout = 12000)
-  public void testLogSlowRPC() throws IOException, ServiceException {
+  public void testLogSlowRPC() throws IOException, ServiceException,
+      TimeoutException, InterruptedException {
     TestRpcService2 client = getClient2();
     // make 10 K fast calls
     for (int x = 0; x < 10000; x++) {
@@ -234,9 +237,9 @@ public class TestProtoBufRpc extends TestRpcBase {
     // make a really slow call. Sleep sleeps for 1000ms
     client.sleep(null, newSleepRequest(SLEEP_DURATION * 3));
 
-    long after = rpcMetrics.getRpcSlowCalls();
     // Ensure slow call is logged.
-    Assert.assertEquals(before + 1L, after);
+    GenericTestUtils.waitFor(()
+        -> rpcMetrics.getRpcSlowCalls() == before + 1L, 10, 1000);
   }
 
   @Test(timeout = 12000)
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
 index 7b4f690..97bb954 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
                
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
 @@ -87,6 +87,8 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
@@ -1072,10 +1074,14 @@ public class TestRPC extends TestRpcBase {
       }
       MetricsRecordBuilder rpcMetrics =
           getMetrics(server.getRpcMetrics().name());
-      assertTrue("Expected non-zero rpc queue time",
-          getLongCounter("RpcQueueTimeNumOps", rpcMetrics) > 0);
-      assertTrue("Expected non-zero rpc processing time",
-          getLongCounter("RpcProcessingTimeNumOps", rpcMetrics) > 0);
+      assertEquals("Expected correct rpc queue count",
+          3000, getLongCounter("RpcQueueTimeNumOps", rpcMetrics));
+      assertEquals("Expected correct rpc processing count",
+          3000, getLongCounter("RpcProcessingTimeNumOps", rpcMetrics));
+      assertEquals("Expected correct rpc lock wait count",
+          3000, getLongCounter("RpcLockWaitTimeNumOps", rpcMetrics));
+      assertEquals("Expected zero rpc lock wait time",
+          0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001);
       MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
           rpcMetrics);
       MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
@@ -1086,6 +1092,10 @@ public class TestRPC extends TestRpcBase {
           UserGroupInformation.getCurrentUser().getShortUserName();
       assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1"));
       assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1"));
+
+      proxy.lockAndSleep(null, newSleepRequest(5));
+      rpcMetrics = getMetrics(server.getRpcMetrics().name());
+      assertGauge("RpcLockWaitTimeAvgTime", 10000.0, rpcMetrics);
     } finally {
       if (proxy2 != null) {
         RPC.stopProxy(proxy2);
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
 index 0d2f975..2f2d36f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
                
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
 @@ -21,12 +21,16 @@ package org.apache.hadoop.ipc;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.protobuf.TestProtos;
 import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.util.Time;
 import org.junit.Assert;
 
 import org.apache.hadoop.io.Text;
@@ -278,6 +282,7 @@ public class TestRpcBase {
   public static class PBServerImpl implements TestRpcService {
     CountDownLatch fastPingCounter = new CountDownLatch(2);
     private List<Server.Call> postponedCalls = new ArrayList<>();
+    private final Lock lock = new ReentrantLock();
 
     @Override
     public TestProtos.EmptyResponseProto ping(RpcController unused,
@@ -389,6 +394,29 @@ public class TestRpcBase {
     }
 
     @Override
+    public TestProtos.EmptyResponseProto lockAndSleep(
+        RpcController controller, TestProtos.SleepRequestProto request)
+        throws ServiceException {
+      ProcessingDetails details =
+          Server.getCurCall().get().getProcessingDetails();
+      lock.lock();
+      long startNanos = Time.monotonicNowNanos();
+      try {
+        Thread.sleep(request.getMilliSeconds());
+      } catch (InterruptedException ignore) {
+        // ignore
+      } finally {
+        lock.unlock();
+      }
+      // Add some arbitrary large lock wait time since in any test scenario
+      // the lock wait time will probably actually be too small to notice
+      details.add(ProcessingDetails.Timing.LOCKWAIT, 10, TimeUnit.SECONDS);
+      details.add(ProcessingDetails.Timing.LOCKEXCLUSIVE,
+          Time.monotonicNowNanos() - startNanos, TimeUnit.NANOSECONDS);
+      return  TestProtos.EmptyResponseProto.newBuilder().build();
+    }
+
+    @Override
     public TestProtos.AuthMethodResponseProto getAuthMethod(
         RpcController controller, TestProtos.EmptyRequestProto request)
         throws ServiceException {
diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto \
b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto index \
                3746411..0df67a0 100644
--- a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
+++ b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
@@ -39,6 +39,7 @@ service TestProtobufRpcProto {
   rpc testServerGet(EmptyRequestProto) returns (EmptyResponseProto);
   rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto);
   rpc sleep(SleepRequestProto) returns (EmptyResponseProto);
+  rpc lockAndSleep(SleepRequestProto) returns (EmptyResponseProto);
   rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto);
   rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto);
   rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java \
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
 index 7c28465..98b4e6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
                
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
 @@ -26,6 +26,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.log.LogThrottlingHelper;
 import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
 import org.apache.hadoop.util.StringUtils;
@@ -41,6 +42,7 @@ import static \
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORT  import static \
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
  import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
  import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
 +import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
 import static org.apache.hadoop.log.LogThrottlingHelper.LogAction;
 
 /**
@@ -142,17 +144,11 @@ class FSNamesystemLock {
   }
 
   public void readLock() {
-    coarseLock.readLock().lock();
-    if (coarseLock.getReadHoldCount() == 1) {
-      readLockHeldTimeStampNanos.set(timer.monotonicNowNanos());
-    }
+    doLock(false);
   }
 
   public void readLockInterruptibly() throws InterruptedException {
-    coarseLock.readLock().lockInterruptibly();
-    if (coarseLock.getReadHoldCount() == 1) {
-      readLockHeldTimeStampNanos.set(timer.monotonicNowNanos());
-    }
+    doLockInterruptibly(false);
   }
 
   public void readUnlock() {
@@ -204,17 +200,11 @@ class FSNamesystemLock {
   }
   
   public void writeLock() {
-    coarseLock.writeLock().lock();
-    if (coarseLock.getWriteHoldCount() == 1) {
-      writeLockHeldTimeStampNanos = timer.monotonicNowNanos();
-    }
+    doLock(true);
   }
 
   public void writeLockInterruptibly() throws InterruptedException {
-    coarseLock.writeLock().lockInterruptibly();
-    if (coarseLock.getWriteHoldCount() == 1) {
-      writeLockHeldTimeStampNanos = timer.monotonicNowNanos();
-    }
+    doLockInterruptibly(true);
   }
 
   /**
@@ -316,6 +306,50 @@ class FSNamesystemLock {
       String overallMetric = getMetricName(OVERALL_METRIC_NAME, isWrite);
       detailedHoldTimeMetrics.add(overallMetric, value);
     }
+    updateProcessingDetails(
+        isWrite ? Timing.LOCKEXCLUSIVE : Timing.LOCKSHARED, value);
+  }
+
+  private void doLock(boolean isWrite) {
+    long startNanos = timer.monotonicNowNanos();
+    if (isWrite) {
+      coarseLock.writeLock().lock();
+    } else {
+      coarseLock.readLock().lock();
+    }
+    updateLockWait(startNanos, isWrite);
+  }
+
+  private void doLockInterruptibly(boolean isWrite)
+      throws InterruptedException {
+    long startNanos = timer.monotonicNowNanos();
+    if (isWrite) {
+      coarseLock.writeLock().lockInterruptibly();
+    } else {
+      coarseLock.readLock().lockInterruptibly();
+    }
+    updateLockWait(startNanos, isWrite);
+  }
+
+  private void updateLockWait(long startNanos, boolean isWrite) {
+    long now = timer.monotonicNowNanos();
+    updateProcessingDetails(Timing.LOCKWAIT, now - startNanos);
+    if (isWrite) {
+      if (coarseLock.getWriteHoldCount() == 1) {
+        writeLockHeldTimeStampNanos = now;
+      }
+    } else {
+      if (coarseLock.getReadHoldCount() == 1) {
+        readLockHeldTimeStampNanos.set(now);
+      }
+    }
+  }
+
+  private static void updateProcessingDetails(Timing type, long deltaNanos) {
+    Server.Call call = Server.getCurCall().get();
+    if (call != null) {
+      call.getProcessingDetails().add(type, deltaNanos, TimeUnit.NANOSECONDS);
+    }
   }
 
   private static String getMetricName(String operationName, boolean isWrite) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java \
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
 index 1ec47ca..5cd0fa4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
                
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
 @@ -374,11 +374,6 @@ public class TestConsistentReadsObserver {
     }
 
     @Override
-    public void addResponseTime(String name, int priorityLevel, int queueTime,
-        int processingTime) {
-    }
-
-    @Override
     public void stop() {
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic