[prev in list] [next in list] [prev in thread] [next in thread]
List: hadoop-commits
Subject: [hadoop] branch branch-3.2 updated: HADOOP-16459. Backport of HADOOP-16266. Add more fine-grained pr
From: xkrogen () apache ! org
Date: 2019-07-30 21:45:21
Message-ID: 156452312088.15624.10659352110699389767 () gitbox ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
xkrogen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new d4492bd HADOOP-16459. Backport of HADOOP-16266. Add more fine-grained \
processing time metrics to the RPC layer. Contributed by Christopher Gregorian. \
d4492bd is described below
commit d4492bdd9edec60c236aff85de50b963097e5a7f
Author: Christopher Gregorian <csgregorian@gmail.com>
AuthorDate: Mon Apr 29 15:37:25 2019 -0700
HADOOP-16459. Backport of HADOOP-16266. Add more fine-grained processing time \
metrics to the RPC layer. Contributed by Christopher Gregorian.
This commit also includes the follow-on commit \
827a84778a4e3b8f165806dfd2966f0951a5e575.
(cherry-picked from f96a2df38d889f29314c57f4d94227b2e419a11f)
---
.../org/apache/hadoop/ipc/CallQueueManager.java | 5 +-
.../org/apache/hadoop/ipc/DecayRpcScheduler.java | 12 +-
.../org/apache/hadoop/ipc/DefaultRpcScheduler.java | 4 +-
.../java/org/apache/hadoop/ipc/ExternalCall.java | 5 +
.../org/apache/hadoop/ipc/ProcessingDetails.java | 96 +++++++++++++
.../org/apache/hadoop/ipc/ProtobufRpcEngine.java | 31 +----
.../java/org/apache/hadoop/ipc/RpcScheduler.java | 41 +++++-
.../main/java/org/apache/hadoop/ipc/Server.java | 152 ++++++++++++++++-----
.../org/apache/hadoop/ipc/WritableRpcEngine.java | 20 +--
.../hadoop/ipc/metrics/RpcDetailedMetrics.java | 6 +-
.../org/apache/hadoop/ipc/metrics/RpcMetrics.java | 63 ++++++---
.../hadoop-common/src/site/markdown/Metrics.md | 9 ++
.../apache/hadoop/ipc/TestProcessingDetails.java | 61 +++++++++
.../org/apache/hadoop/ipc/TestProtoBufRpc.java | 9 +-
.../test/java/org/apache/hadoop/ipc/TestRPC.java | 18 ++-
.../java/org/apache/hadoop/ipc/TestRpcBase.java | 28 ++++
.../src/test/proto/test_rpc_service.proto | 1 +
.../hdfs/server/namenode/FSNamesystemLock.java | 66 ++++++---
.../namenode/ha/TestConsistentReadsObserver.java | 5 -
19 files changed, 499 insertions(+), 133 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 9731e13..e18f307 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -193,9 +193,8 @@ public class CallQueueManager<E extends Schedulable>
return scheduler.shouldBackOff(e);
}
- void addResponseTime(String name, int priorityLevel, int queueTime,
- int processingTime) {
- scheduler.addResponseTime(name, priorityLevel, queueTime, processingTime);
+ void addResponseTime(String name, Schedulable e, ProcessingDetails details) {
+ scheduler.addResponseTime(name, e, details);
}
// This should be only called once per call and cached in the call object
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index 8bb0ce4..3e3c1f7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -55,6 +55,8 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
+
/**
* The decay RPC scheduler counts incoming requests in a map, then
* decays the counts at a fixed time interval. The scheduler is optimized
@@ -600,14 +602,18 @@ public class DecayRpcScheduler implements RpcScheduler,
}
@Override
- public void addResponseTime(String name, int priorityLevel, int queueTime,
- int processingTime) {
+ public void addResponseTime(String callName, Schedulable schedulable,
+ ProcessingDetails details) {
+ int priorityLevel = schedulable.getPriorityLevel();
+ long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS);
+ long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS);
+
responseTimeCountInCurrWindow.getAndIncrement(priorityLevel);
responseTimeTotalInCurrWindow.getAndAdd(priorityLevel,
queueTime+processingTime);
if (LOG.isDebugEnabled()) {
LOG.debug("addResponseTime for call: {} priority: {} queueTime: {} " +
- "processingTime: {} ", name, priorityLevel, queueTime,
+ "processingTime: {} ", callName, priorityLevel, queueTime,
processingTime);
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
index 0847af7..696160e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
@@ -35,8 +35,8 @@ public class DefaultRpcScheduler implements RpcScheduler {
}
@Override
- public void addResponseTime(String name, int priorityLevel, int queueTime,
- int processingTime) {
+ public void addResponseTime(String callName, Schedulable schedulable,
+ ProcessingDetails details) {
}
public DefaultRpcScheduler(int priorityLevels, String namespace,
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
index 5cc3665..39e5534 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
@@ -37,6 +37,11 @@ public abstract class ExternalCall<T> extends Call {
this.action = action;
}
+ @Override
+ public String getDetailedMetricsName() {
+ return "(external)";
+ }
+
public abstract UserGroupInformation getRemoteUser();
public final T get() throws InterruptedException, ExecutionException {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java
new file mode 100644
index 0000000..5b97eec
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Stores the times that a call takes to be processed through each step.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Private
+public class ProcessingDetails {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(ProcessingDetails.class);
+ private final TimeUnit valueTimeUnit;
+
+ /**
+ * The different stages to track the time of.
+ */
+ public enum Timing {
+ ENQUEUE, // time for reader to insert in call queue.
+ QUEUE, // time in the call queue.
+ HANDLER, // handler overhead not spent in processing/response.
+ PROCESSING, // time handler spent processing the call. always equal to
+ // lock_free + lock_wait + lock_shared + lock_exclusive
+ LOCKFREE, // processing with no lock.
+ LOCKWAIT, // processing while waiting for lock.
+ LOCKSHARED, // processing with a read lock.
+ LOCKEXCLUSIVE, // processing with a write lock.
+ RESPONSE; // time to encode and send response.
+ }
+
+ private long[] timings = new long[Timing.values().length];
+
+ ProcessingDetails(TimeUnit timeUnit) {
+ this.valueTimeUnit = timeUnit;
+ }
+
+ public long get(Timing type) {
+ // When using nanoTime to fetch timing information, it is possible to see
+ // time "move backward" slightly under unusual/rare circumstances. To avoid
+ // displaying a confusing number, round such timings to 0 here.
+ long ret = timings[type.ordinal()];
+ return ret < 0 ? 0 : ret;
+ }
+
+ public long get(Timing type, TimeUnit timeUnit) {
+ return timeUnit.convert(get(type), valueTimeUnit);
+ }
+
+ public void set(Timing type, long value) {
+ timings[type.ordinal()] = value;
+ }
+
+ public void set(Timing type, long value, TimeUnit timeUnit) {
+ set(type, valueTimeUnit.convert(value, timeUnit));
+ }
+
+ public void add(Timing type, long value, TimeUnit timeUnit) {
+ timings[type.ordinal()] += valueTimeUnit.convert(value, timeUnit);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(256);
+ for (Timing type : Timing.values()) {
+ if (sb.length() > 0) {
+ sb.append(" ");
+ }
+ sb.append(type.name().toLowerCase())
+ .append("Time=").append(get(type));
+ }
+ return sb.toString();
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 5548566..15d068b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -520,46 +520,29 @@ public class ProtobufRpcEngine implements RpcEngine {
Message param = request.getValue(prototype);
Message result;
- long startTime = Time.now();
- int qTime = (int) (startTime - receiveTime);
- Exception exception = null;
- boolean isDeferred = false;
+ Call currentCall = Server.getCurCall().get();
try {
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
currentCallInfo.set(new CallInfo(server, methodName));
+ currentCall.setDetailedMetricsName(methodName);
result = service.callBlockingMethod(methodDescriptor, null, param);
// Check if this needs to be a deferred response,
// by checking the ThreadLocal callback being set
if (currentCallback.get() != null) {
- Server.getCurCall().get().deferResponse();
- isDeferred = true;
+ currentCall.deferResponse();
currentCallback.set(null);
return null;
}
} catch (ServiceException e) {
- exception = (Exception) e.getCause();
+ Exception exception = (Exception) e.getCause();
+ currentCall.setDetailedMetricsName(
+ exception.getClass().getSimpleName());
throw (Exception) e.getCause();
} catch (Exception e) {
- exception = e;
+ currentCall.setDetailedMetricsName(e.getClass().getSimpleName());
throw e;
} finally {
currentCallInfo.set(null);
- int processingTime = (int) (Time.now() - startTime);
- if (LOG.isDebugEnabled()) {
- String msg =
- "Served: " + methodName + (isDeferred ? ", deferred" : "") +
- ", queueTime= " + qTime +
- " procesingTime= " + processingTime;
- if (exception != null) {
- msg += " exception= " + exception.getClass().getSimpleName();
- }
- LOG.debug(msg);
- }
- String detailedMetricsName = (exception == null) ?
- methodName :
- exception.getClass().getSimpleName();
- server.updateMetrics(detailedMetricsName, qTime, processingTime,
- isDeferred);
}
return RpcWritable.wrap(result);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
index 95c5a13..63812f4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.ipc;
+import java.util.concurrent.TimeUnit;
+
/**
* Implement this interface to be used for RPC scheduling and backoff.
*
@@ -30,8 +32,43 @@ public interface RpcScheduler {
boolean shouldBackOff(Schedulable obj);
- void addResponseTime(String name, int priorityLevel, int queueTime,
- int processingTime);
+ /**
+ * This method only exists to maintain backwards compatibility with old
+ * implementations. It will not be called by any Hadoop code, and should not
+ * be implemented by new implementations.
+ *
+ * @deprecated Use
+ * {@link #addResponseTime(String, Schedulable, ProcessingDetails)} instead.
+ */
+ @Deprecated
+ @SuppressWarnings("unused")
+ default void addResponseTime(String name, int priorityLevel, int queueTime,
+ int processingTime) {
+ throw new UnsupportedOperationException(
+ "This method is deprecated: use the other addResponseTime");
+ }
+
+ /**
+ * Store a processing time value for an RPC call into this scheduler.
+ *
+ * @param callName The name of the call.
+ * @param schedulable The schedulable representing the incoming call.
+ * @param details The details of processing time.
+ */
+ @SuppressWarnings("deprecation")
+ default void addResponseTime(String callName, Schedulable schedulable,
+ ProcessingDetails details) {
+ // For the sake of backwards compatibility with old implementations of
+ // this interface, a default implementation is supplied which uses the old
+ // method. All new implementations MUST override this interface and should
+ // NOT use the other addResponseTime method.
+ int queueTimeMs = (int)
+ details.get(ProcessingDetails.Timing.QUEUE, TimeUnit.MILLISECONDS);
+ int processingTimeMs = (int)
+ details.get(ProcessingDetails.Timing.PROCESSING, TimeUnit.MILLISECONDS);
+ addResponseTime(callName, schedulable.getPriorityLevel(),
+ queueTimeMs, processingTimeMs);
+ }
void stop();
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java \
index 9f286ab..cd6d350 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ipc;
+import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
import static org.apache.hadoop.ipc.RpcConstants.AUTHORIZATION_FAILED_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
@@ -63,6 +64,7 @@ import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -491,7 +493,7 @@ public abstract class Server {
* if and only if it falls above 99.7% of requests. We start this logic
* only once we have enough sample size.
*/
- void logSlowRpcCalls(String methodName, int processingTime) {
+ void logSlowRpcCalls(String methodName, Call call, long processingTime) {
final int deviation = 3;
// 1024 for minSampleSize just a guess -- not a number computed based on
@@ -504,27 +506,47 @@ public abstract class Server {
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
(processingTime > threeSigma)) {
- if(LOG.isWarnEnabled()) {
- String client = CurCall.get().toString();
- LOG.warn(
- "Slow RPC : " + methodName + " took " + processingTime +
- " milliseconds to process from client " + client);
- }
+ LOG.warn("Slow RPC : {} took {} {} to process from client {}",
+ methodName, processingTime, RpcMetrics.TIMEUNIT, call);
rpcMetrics.incrSlowRpc();
}
}
- void updateMetrics(String name, int queueTime, int processingTime,
- boolean deferredCall) {
+ void updateMetrics(Call call, long startTime, boolean connDropped) {
+ // delta = handler + processing + response
+ long deltaNanos = Time.monotonicNowNanos() - startTime;
+ long timestampNanos = call.timestampNanos;
+
+ ProcessingDetails details = call.getProcessingDetails();
+ // queue time is the delta between when the call first arrived and when it
+ // began being serviced, minus the time it took to be put into the queue
+ details.set(Timing.QUEUE,
+ startTime - timestampNanos - details.get(Timing.ENQUEUE));
+ deltaNanos -= details.get(Timing.PROCESSING);
+ deltaNanos -= details.get(Timing.RESPONSE);
+ details.set(Timing.HANDLER, deltaNanos);
+
+ long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
rpcMetrics.addRpcQueueTime(queueTime);
- if (!deferredCall) {
- rpcMetrics.addRpcProcessingTime(processingTime);
- rpcDetailedMetrics.addProcessingTime(name, processingTime);
- callQueue.addResponseTime(name, getPriorityLevel(), queueTime,
- processingTime);
- if (isLogSlowRPC()) {
- logSlowRpcCalls(name, processingTime);
- }
+
+ if (call.isResponseDeferred() || connDropped) {
+ // call was skipped; don't include it in processing metrics
+ return;
+ }
+
+ long processingTime =
+ details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
+ long waitTime =
+ details.get(Timing.LOCKWAIT, RpcMetrics.TIMEUNIT);
+ rpcMetrics.addRpcLockWaitTime(waitTime);
+ rpcMetrics.addRpcProcessingTime(processingTime);
+ // don't include lock wait for detailed metrics.
+ processingTime -= waitTime;
+ String name = call.getDetailedMetricsName();
+ rpcDetailedMetrics.addProcessingTime(name, processingTime);
+ callQueue.addResponseTime(name, call, details);
+ if (isLogSlowRPC()) {
+ logSlowRpcCalls(name, call, processingTime);
}
}
@@ -693,9 +715,13 @@ public abstract class Server {
/** A generic call queued for handling. */
public static class Call implements Schedulable,
PrivilegedExceptionAction<Void> {
+ private final ProcessingDetails processingDetails =
+ new ProcessingDetails(TimeUnit.NANOSECONDS);
+ // the method name to use in metrics
+ private volatile String detailedMetricsName = "";
final int callId; // the client's call id
final int retryCount; // the retry count of the call
- long timestamp; // time received when response is null
+ long timestampNanos; // time received when response is null
// time served when response is not null
private AtomicInteger responseWaitCount = new AtomicInteger(1);
final RPC.RpcKind rpcKind;
@@ -732,7 +758,7 @@ public abstract class Server {
TraceScope traceScope, CallerContext callerContext) {
this.callId = id;
this.retryCount = retryCount;
- this.timestamp = Time.now();
+ this.timestampNanos = Time.monotonicNowNanos();
this.rpcKind = kind;
this.clientId = clientId;
this.traceScope = traceScope;
@@ -741,6 +767,28 @@ public abstract class Server {
this.isCallCoordinated = false;
}
+ /**
+ * Indicates whether the call has been processed. Always true unless
+ * overridden.
+ *
+ * @return true
+ */
+ boolean isOpen() {
+ return true;
+ }
+
+ String getDetailedMetricsName() {
+ return detailedMetricsName;
+ }
+
+ void setDetailedMetricsName(String name) {
+ detailedMetricsName = name;
+ }
+
+ public ProcessingDetails getProcessingDetails() {
+ return processingDetails;
+ }
+
@Override
public String toString() {
return "Call#" + callId + " Retry#" + retryCount;
@@ -888,6 +936,11 @@ public abstract class Server {
this.rpcRequest = param;
}
+ @Override
+ boolean isOpen() {
+ return connection.channel.isOpen();
+ }
+
void setResponseFields(Writable returnValue,
ResponseParams responseParams) {
this.rv = returnValue;
@@ -915,18 +968,33 @@ public abstract class Server {
Server.LOG.info(Thread.currentThread().getName() + ": skipped " + this);
return null;
}
+
+ long startNanos = Time.monotonicNowNanos();
Writable value = null;
ResponseParams responseParams = new ResponseParams();
try {
value = call(
- rpcKind, connection.protocolName, rpcRequest, timestamp);
+ rpcKind, connection.protocolName, rpcRequest, timestampNanos);
} catch (Throwable e) {
populateResponseParamsOnError(e, responseParams);
}
if (!isResponseDeferred()) {
+ long deltaNanos = Time.monotonicNowNanos() - startNanos;
+ ProcessingDetails details = getProcessingDetails();
+
+ details.set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
+ deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
+ deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
+ deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
+ details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
+ startNanos = Time.monotonicNowNanos();
+
setResponseFields(value, responseParams);
sendResponse();
+
+ deltaNanos = Time.monotonicNowNanos() - startNanos;
+ details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Deferring response for callId: " + this.callId);
@@ -1346,12 +1414,13 @@ public abstract class Server {
}
}
+ private final static long PURGE_INTERVAL_NANOS = TimeUnit.NANOSECONDS.convert(
+ 15, TimeUnit.MINUTES);
+
// Sends responses of RPC back to clients.
private class Responder extends Thread {
private final Selector writeSelector;
private int pending; // connections waiting to register
-
- final static int PURGE_INTERVAL = 900000; // 15mins
Responder() throws IOException {
this.setName("IPC Server Responder");
@@ -1377,12 +1446,13 @@ public abstract class Server {
}
private void doRunLoop() {
- long lastPurgeTime = 0; // last check for old calls.
+ long lastPurgeTimeNanos = 0; // last check for old calls.
while (running) {
try {
waitPending(); // If a channel is being registered, wait.
- writeSelector.select(PURGE_INTERVAL);
+ writeSelector.select(
+ TimeUnit.NANOSECONDS.toMillis(PURGE_INTERVAL_NANOS));
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
@@ -1404,11 +1474,11 @@ public abstract class Server {
LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw \
exception " + e); }
}
- long now = Time.now();
- if (now < lastPurgeTime + PURGE_INTERVAL) {
+ long nowNanos = Time.monotonicNowNanos();
+ if (nowNanos < lastPurgeTimeNanos + PURGE_INTERVAL_NANOS) {
continue;
}
- lastPurgeTime = now;
+ lastPurgeTimeNanos = nowNanos;
//
// If there were some calls that have not been sent out for a
// long time, discard them.
@@ -1432,7 +1502,7 @@ public abstract class Server {
}
for (RpcCall call : calls) {
- doPurge(call, now);
+ doPurge(call, nowNanos);
}
} catch (OutOfMemoryError e) {
//
@@ -1483,7 +1553,7 @@ public abstract class Server {
Iterator<RpcCall> iter = responseQueue.listIterator(0);
while (iter.hasNext()) {
call = iter.next();
- if (now > call.timestamp + PURGE_INTERVAL) {
+ if (now > call.timestampNanos + PURGE_INTERVAL_NANOS) {
closeConnection(call.connection);
break;
}
@@ -1547,7 +1617,7 @@ public abstract class Server {
if (inHandler) {
// set the serve time when the response has to be sent later
- call.timestamp = Time.now();
+ call.timestampNanos = Time.monotonicNowNanos();
incPending();
try {
@@ -2731,6 +2801,9 @@ public abstract class Server {
} else {
callQueue.add(call);
}
+ long deltaNanos = Time.monotonicNowNanos() - call.timestampNanos;
+ call.getProcessingDetails().set(Timing.ENQUEUE, deltaNanos,
+ TimeUnit.NANOSECONDS);
} catch (CallQueueOverflowException cqe) {
// If rpc scheduler indicates back off based on performance degradation
// such as response time or rpc queue is full, we will ask the client
@@ -2757,8 +2830,16 @@ public abstract class Server {
SERVER.set(Server.this);
while (running) {
TraceScope traceScope = null;
+ Call call = null;
+ long startTimeNanos = 0;
+ // True iff the connection for this call has been dropped.
+ // Set to true by default and update to false later if the connection
+ // can be succesfully read.
+ boolean connDropped = true;
+
try {
- final Call call = callQueue.take(); // pop the queue; maybe blocked here
+ call = callQueue.take(); // pop the queue; maybe blocked here
+ startTimeNanos = Time.monotonicNowNanos();
if (alignmentContext != null && call.isCallCoordinated() &&
call.getClientStateId() > alignmentContext.getLastSeenStateId()) {
/*
@@ -2789,6 +2870,7 @@ public abstract class Server {
// always update the current call context
CallerContext.setCurrent(call.callerContext);
UserGroupInformation remoteUser = call.getRemoteUser();
+ connDropped = !call.isOpen();
if (remoteUser != null) {
remoteUser.doAs(call);
} else {
@@ -2811,6 +2893,14 @@ public abstract class Server {
} finally {
CurCall.set(null);
IOUtils.cleanupWithLogger(LOG, traceScope);
+ if (call != null) {
+ updateMetrics(call, startTimeNanos, connDropped);
+ ProcessingDetails.LOG.debug(
+ "Served: [{}]{} name={} user={} details={}",
+ call, (call.isResponseDeferred() ? ", deferred" : ""),
+ call.getDetailedMetricsName(), call.getRemoteUser(),
+ call.getProcessingDetails());
+ }
}
}
LOG.debug(Thread.currentThread().getName() + ": exiting");
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
index 2e3b559..6a5ac5b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -537,15 +537,15 @@ public class WritableRpcEngine implements RpcEngine {
}
// Invoke the protocol method
- long startTime = Time.now();
- int qTime = (int) (startTime-receivedTime);
Exception exception = null;
+ Call currentCall = Server.getCurCall().get();
try {
Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
+ currentCall.setDetailedMetricsName(call.getMethodName());
Object value =
method.invoke(protocolImpl.protocolImpl, call.getParameters());
if (server.verbose) log("Return: "+value);
@@ -571,20 +571,10 @@ public class WritableRpcEngine implements RpcEngine {
exception = ioe;
throw ioe;
} finally {
- int processingTime = (int) (Time.now() - startTime);
- if (LOG.isDebugEnabled()) {
- String msg = "Served: " + call.getMethodName() +
- " queueTime= " + qTime + " procesingTime= " + processingTime;
- if (exception != null) {
- msg += " exception= " + exception.getClass().getSimpleName();
- }
- LOG.debug(msg);
+ if (exception != null) {
+ currentCall.setDetailedMetricsName(
+ exception.getClass().getSimpleName());
}
- String detailedMetricsName = (exception == null) ?
- call.getMethodName() :
- exception.getClass().getSimpleName();
- server
- .updateMetrics(detailedMetricsName, qTime, processingTime, false);
}
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
index e50895b..67ae4cc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
@@ -66,12 +66,12 @@ public class RpcDetailedMetrics {
/**
* Add an RPC processing time sample
- * @param name of the RPC call
+ * @param rpcCallName of the RPC call
* @param processingTime the processing time
*/
//@Override // some instrumentation interface
- public void addProcessingTime(String name, int processingTime) {
- rates.add(name, processingTime);
+ public void addProcessingTime(String rpcCallName, long processingTime) {
+ rates.add(rpcCallName, processingTime);
}
public void addDeferredProcessingTime(String name, long processingTime) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
index 84ca9fe..bb4bfcf 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.ipc.metrics;
+import java.util.concurrent.TimeUnit;
+
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.Server;
@@ -27,7 +29,6 @@ import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.metrics2.lib.MutableRate;
@@ -47,6 +48,8 @@ public class RpcMetrics {
final MetricsRegistry registry;
final String name;
final boolean rpcQuantileEnable;
+ /** The time unit used when storing/accessing time durations. */
+ public final static TimeUnit TIMEUNIT = TimeUnit.MILLISECONDS;
RpcMetrics(Server server, Configuration conf) {
String port = String.valueOf(server.getListenerAddress().getPort());
@@ -61,24 +64,31 @@ public class RpcMetrics {
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE,
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE_DEFAULT);
if (rpcQuantileEnable) {
- rpcQueueTimeMillisQuantiles =
+ rpcQueueTimeQuantiles =
+ new MutableQuantiles[intervals.length];
+ rpcLockWaitTimeQuantiles =
new MutableQuantiles[intervals.length];
- rpcProcessingTimeMillisQuantiles =
+ rpcProcessingTimeQuantiles =
new MutableQuantiles[intervals.length];
- deferredRpcProcessingTimeMillisQuantiles =
+ deferredRpcProcessingTimeQuantiles =
new MutableQuantiles[intervals.length];
for (int i = 0; i < intervals.length; i++) {
int interval = intervals[i];
- rpcQueueTimeMillisQuantiles[i] = registry.newQuantiles("rpcQueueTime"
- + interval + "s", "rpc queue time in milli second", "ops",
+ rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime"
+ + interval + "s", "rpc queue time in " + TIMEUNIT, "ops",
+ "latency", interval);
+ rpcLockWaitTimeQuantiles[i] = registry.newQuantiles(
+ "rpcLockWaitTime" + interval + "s",
+ "rpc lock wait time in " + TIMEUNIT, "ops",
"latency", interval);
- rpcProcessingTimeMillisQuantiles[i] = registry.newQuantiles(
+ rpcProcessingTimeQuantiles[i] = registry.newQuantiles(
"rpcProcessingTime" + interval + "s",
- "rpc processing time in milli second", "ops", "latency", interval);
- deferredRpcProcessingTimeMillisQuantiles[i] = registry
- .newQuantiles("deferredRpcProcessingTime" + interval + "s",
- "deferred rpc processing time in milli seconds", "ops",
- "latency", interval);
+ "rpc processing time in " + TIMEUNIT, "ops",
+ "latency", interval);
+ deferredRpcProcessingTimeQuantiles[i] = registry.newQuantiles(
+ "deferredRpcProcessingTime" + interval + "s",
+ "deferred rpc processing time in " + TIMEUNIT, "ops",
+ "latency", interval);
}
}
LOG.debug("Initialized " + registry);
@@ -94,11 +104,13 @@ public class RpcMetrics {
@Metric("Number of received bytes") MutableCounterLong receivedBytes;
@Metric("Number of sent bytes") MutableCounterLong sentBytes;
@Metric("Queue time") MutableRate rpcQueueTime;
- MutableQuantiles[] rpcQueueTimeMillisQuantiles;
+ MutableQuantiles[] rpcQueueTimeQuantiles;
+ @Metric("Lock wait time") MutableRate rpcLockWaitTime;
+ MutableQuantiles[] rpcLockWaitTimeQuantiles;
@Metric("Processing time") MutableRate rpcProcessingTime;
- MutableQuantiles[] rpcProcessingTimeMillisQuantiles;
+ MutableQuantiles[] rpcProcessingTimeQuantiles;
@Metric("Deferred Processing time") MutableRate deferredRpcProcessingTime;
- MutableQuantiles[] deferredRpcProcessingTimeMillisQuantiles;
+ MutableQuantiles[] deferredRpcProcessingTimeQuantiles;
@Metric("Number of authentication failures")
MutableCounterLong rpcAuthenticationFailures;
@Metric("Number of authentication successes")
@@ -196,25 +208,32 @@ public class RpcMetrics {
* Add an RPC queue time sample
* @param qTime the queue time
*/
- //@Override
- public void addRpcQueueTime(int qTime) {
+ public void addRpcQueueTime(long qTime) {
rpcQueueTime.add(qTime);
if (rpcQuantileEnable) {
- for (MutableQuantiles q : rpcQueueTimeMillisQuantiles) {
+ for (MutableQuantiles q : rpcQueueTimeQuantiles) {
q.add(qTime);
}
}
}
+ public void addRpcLockWaitTime(long waitTime) {
+ rpcLockWaitTime.add(waitTime);
+ if (rpcQuantileEnable) {
+ for (MutableQuantiles q : rpcLockWaitTimeQuantiles) {
+ q.add(waitTime);
+ }
+ }
+ }
+
/**
* Add an RPC processing time sample
* @param processingTime the processing time
*/
- //@Override
- public void addRpcProcessingTime(int processingTime) {
+ public void addRpcProcessingTime(long processingTime) {
rpcProcessingTime.add(processingTime);
if (rpcQuantileEnable) {
- for (MutableQuantiles q : rpcProcessingTimeMillisQuantiles) {
+ for (MutableQuantiles q : rpcProcessingTimeQuantiles) {
q.add(processingTime);
}
}
@@ -223,7 +242,7 @@ public class RpcMetrics {
public void addDeferredRpcProcessingTime(long processingTime) {
deferredRpcProcessingTime.add(processingTime);
if (rpcQuantileEnable) {
- for (MutableQuantiles q : deferredRpcProcessingTimeMillisQuantiles) {
+ for (MutableQuantiles q : deferredRpcProcessingTimeQuantiles) {
q.add(processingTime);
}
}
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md \
b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index \
2351d97..e611cdb 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -71,6 +71,8 @@ Each metrics record contains tags such as Hostname and port (number \
to which ser | `SentBytes` | Total number of sent bytes |
| `RpcQueueTimeNumOps` | Total number of RPC calls |
| `RpcQueueTimeAvgTime` | Average queue time in milliseconds |
+| `RpcLockWaitTimeNumOps` | Total number of RPC call (same as RpcQueueTimeNumOps) |
+| `RpcLockWaitTimeAvgTime` | Average time waiting for lock acquisition in \
milliseconds | | `RpcProcessingTimeNumOps` | Total number of RPC calls (same to \
RpcQueueTimeNumOps) | | `RpcProcessingAvgTime` | Average Processing time in \
milliseconds | | `RpcAuthenticationFailures` | Total number of authentication \
failures | @@ -92,6 +94,12 @@ Each metrics record contains tags such as Hostname and \
port (number to which ser | `rpcProcessingTime`*num*`s90thPercentileLatency` | Shows \
the 90th percentile of RPC processing time in milliseconds (*num* seconds \
granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | | \
`rpcProcessingTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC \
processing time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | | \
`rpcProcessingTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC \
processing time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | +| `rpcLockWaitTime`*num*`sNumOps` | Shows \
total number of RPC calls (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | +| \
`rpcLockWaitTime`*num*`s50thPercentileLatency` | Shows the 50th percentile of RPC \
lock wait time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | +| \
`rpcLockWaitTime`*num*`s75thPercentileLatency` | Shows the 75th percentile of RPC \
lock wait time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | +| \
`rpcLockWaitTime`*num*`s90thPercentileLatency` | Shows the 90th percentile of RPC \
lock wait time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | +| \
`rpcLockWaitTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC \
lock wait time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. | +| \
`rpcLockWaitTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC \
lock wait time in milliseconds (*num* seconds granularity) if \
`rpc.metrics.quantile.enable` is set to true. *num* is specified by \
`rpc.metrics.percentiles.intervals`. |
RetryCache/NameNodeRetryCache
-----------------------------
@@ -118,6 +126,7 @@ rpcdetailed context
===================
Metrics of rpcdetailed context are exposed in unified manner by RPC layer. Two \
metrics are exposed for each RPC based on its name. Metrics named "(RPC method \
name)NumOps" indicates total number of method calls, and metrics named "(RPC method \
name)AvgTime" shows average turn around time for method calls in milliseconds. \
+Please note that the AvgTime metrics do not include time spent waiting to acquire \
locks on data structures (see RpcLockWaitTimeAvgTime).
rpcdetailed
-----------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProcessingDetails.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProcessingDetails.java
new file mode 100644
index 0000000..0ecc741
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProcessingDetails.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for ProcessingDetails time unit conversion and output.
+ */
+public class TestProcessingDetails {
+
+ /**
+ * Test that the conversion of time values in various units in and out of the
+ * details are done properly.
+ */
+ @Test
+ public void testTimeConversion() {
+ ProcessingDetails details = new ProcessingDetails(TimeUnit.MICROSECONDS);
+
+ details.set(Timing.ENQUEUE, 10);
+ assertEquals(10, details.get(Timing.ENQUEUE));
+ assertEquals(10_000, details.get(Timing.ENQUEUE, TimeUnit.NANOSECONDS));
+
+ details.set(Timing.QUEUE, 20, TimeUnit.MILLISECONDS);
+ details.add(Timing.QUEUE, 20, TimeUnit.MICROSECONDS);
+ assertEquals(20_020, details.get(Timing.QUEUE));
+ assertEquals(0, details.get(Timing.QUEUE, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testToString() {
+ ProcessingDetails details = new ProcessingDetails(TimeUnit.MICROSECONDS);
+ details.set(Timing.ENQUEUE, 10);
+ details.set(Timing.QUEUE, 20, TimeUnit.MILLISECONDS);
+
+ assertEquals("enqueueTime=10 queueTime=20000 handlerTime=0 " +
+ "processingTime=0 lockfreeTime=0 lockwaitTime=0 locksharedTime=0 " +
+ "lockexclusiveTime=0 responseTime=0", details.toString());
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
index 5fbd957..fd6a7ae 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
@@ -34,6 +34,7 @@ import \
org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto import \
org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; import \
org.apache.hadoop.metrics2.MetricsRecordBuilder; import \
org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -41,6 +42,7 @@ import org.junit.Test;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@@ -215,7 +217,8 @@ public class TestProtoBufRpc extends TestRpcBase {
}
@Test(timeout = 12000)
- public void testLogSlowRPC() throws IOException, ServiceException {
+ public void testLogSlowRPC() throws IOException, ServiceException,
+ TimeoutException, InterruptedException {
TestRpcService2 client = getClient2();
// make 10 K fast calls
for (int x = 0; x < 10000; x++) {
@@ -234,9 +237,9 @@ public class TestProtoBufRpc extends TestRpcBase {
// make a really slow call. Sleep sleeps for 1000ms
client.sleep(null, newSleepRequest(SLEEP_DURATION * 3));
- long after = rpcMetrics.getRpcSlowCalls();
// Ensure slow call is logged.
- Assert.assertEquals(before + 1L, after);
+ GenericTestUtils.waitFor(()
+ -> rpcMetrics.getRpcSlowCalls() == before + 1L, 10, 1000);
}
@Test(timeout = 12000)
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 7b4f690..97bb954 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -87,6 +87,8 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
@@ -1072,10 +1074,14 @@ public class TestRPC extends TestRpcBase {
}
MetricsRecordBuilder rpcMetrics =
getMetrics(server.getRpcMetrics().name());
- assertTrue("Expected non-zero rpc queue time",
- getLongCounter("RpcQueueTimeNumOps", rpcMetrics) > 0);
- assertTrue("Expected non-zero rpc processing time",
- getLongCounter("RpcProcessingTimeNumOps", rpcMetrics) > 0);
+ assertEquals("Expected correct rpc queue count",
+ 3000, getLongCounter("RpcQueueTimeNumOps", rpcMetrics));
+ assertEquals("Expected correct rpc processing count",
+ 3000, getLongCounter("RpcProcessingTimeNumOps", rpcMetrics));
+ assertEquals("Expected correct rpc lock wait count",
+ 3000, getLongCounter("RpcLockWaitTimeNumOps", rpcMetrics));
+ assertEquals("Expected zero rpc lock wait time",
+ 0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001);
MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
rpcMetrics);
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
@@ -1086,6 +1092,10 @@ public class TestRPC extends TestRpcBase {
UserGroupInformation.getCurrentUser().getShortUserName();
assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1"));
assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1"));
+
+ proxy.lockAndSleep(null, newSleepRequest(5));
+ rpcMetrics = getMetrics(server.getRpcMetrics().name());
+ assertGauge("RpcLockWaitTimeAvgTime", 10000.0, rpcMetrics);
} finally {
if (proxy2 != null) {
RPC.stopProxy(proxy2);
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
index 0d2f975..2f2d36f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
@@ -21,12 +21,16 @@ package org.apache.hadoop.ipc;
import com.google.protobuf.BlockingService;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.apache.hadoop.io.Text;
@@ -278,6 +282,7 @@ public class TestRpcBase {
public static class PBServerImpl implements TestRpcService {
CountDownLatch fastPingCounter = new CountDownLatch(2);
private List<Server.Call> postponedCalls = new ArrayList<>();
+ private final Lock lock = new ReentrantLock();
@Override
public TestProtos.EmptyResponseProto ping(RpcController unused,
@@ -389,6 +394,29 @@ public class TestRpcBase {
}
@Override
+ public TestProtos.EmptyResponseProto lockAndSleep(
+ RpcController controller, TestProtos.SleepRequestProto request)
+ throws ServiceException {
+ ProcessingDetails details =
+ Server.getCurCall().get().getProcessingDetails();
+ lock.lock();
+ long startNanos = Time.monotonicNowNanos();
+ try {
+ Thread.sleep(request.getMilliSeconds());
+ } catch (InterruptedException ignore) {
+ // ignore
+ } finally {
+ lock.unlock();
+ }
+ // Add some arbitrary large lock wait time since in any test scenario
+ // the lock wait time will probably actually be too small to notice
+ details.add(ProcessingDetails.Timing.LOCKWAIT, 10, TimeUnit.SECONDS);
+ details.add(ProcessingDetails.Timing.LOCKEXCLUSIVE,
+ Time.monotonicNowNanos() - startNanos, TimeUnit.NANOSECONDS);
+ return TestProtos.EmptyResponseProto.newBuilder().build();
+ }
+
+ @Override
public TestProtos.AuthMethodResponseProto getAuthMethod(
RpcController controller, TestProtos.EmptyRequestProto request)
throws ServiceException {
diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto \
b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto index \
3746411..0df67a0 100644
--- a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
+++ b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
@@ -39,6 +39,7 @@ service TestProtobufRpcProto {
rpc testServerGet(EmptyRequestProto) returns (EmptyResponseProto);
rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto);
rpc sleep(SleepRequestProto) returns (EmptyResponseProto);
+ rpc lockAndSleep(SleepRequestProto) returns (EmptyResponseProto);
rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto);
rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto);
rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java \
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
index 7c28465..98b4e6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
@@ -26,6 +26,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
import org.apache.hadoop.util.StringUtils;
@@ -41,6 +42,7 @@ import static \
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORT import static \
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
+import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
import static org.apache.hadoop.log.LogThrottlingHelper.LogAction;
/**
@@ -142,17 +144,11 @@ class FSNamesystemLock {
}
public void readLock() {
- coarseLock.readLock().lock();
- if (coarseLock.getReadHoldCount() == 1) {
- readLockHeldTimeStampNanos.set(timer.monotonicNowNanos());
- }
+ doLock(false);
}
public void readLockInterruptibly() throws InterruptedException {
- coarseLock.readLock().lockInterruptibly();
- if (coarseLock.getReadHoldCount() == 1) {
- readLockHeldTimeStampNanos.set(timer.monotonicNowNanos());
- }
+ doLockInterruptibly(false);
}
public void readUnlock() {
@@ -204,17 +200,11 @@ class FSNamesystemLock {
}
public void writeLock() {
- coarseLock.writeLock().lock();
- if (coarseLock.getWriteHoldCount() == 1) {
- writeLockHeldTimeStampNanos = timer.monotonicNowNanos();
- }
+ doLock(true);
}
public void writeLockInterruptibly() throws InterruptedException {
- coarseLock.writeLock().lockInterruptibly();
- if (coarseLock.getWriteHoldCount() == 1) {
- writeLockHeldTimeStampNanos = timer.monotonicNowNanos();
- }
+ doLockInterruptibly(true);
}
/**
@@ -316,6 +306,50 @@ class FSNamesystemLock {
String overallMetric = getMetricName(OVERALL_METRIC_NAME, isWrite);
detailedHoldTimeMetrics.add(overallMetric, value);
}
+ updateProcessingDetails(
+ isWrite ? Timing.LOCKEXCLUSIVE : Timing.LOCKSHARED, value);
+ }
+
+ private void doLock(boolean isWrite) {
+ long startNanos = timer.monotonicNowNanos();
+ if (isWrite) {
+ coarseLock.writeLock().lock();
+ } else {
+ coarseLock.readLock().lock();
+ }
+ updateLockWait(startNanos, isWrite);
+ }
+
+ private void doLockInterruptibly(boolean isWrite)
+ throws InterruptedException {
+ long startNanos = timer.monotonicNowNanos();
+ if (isWrite) {
+ coarseLock.writeLock().lockInterruptibly();
+ } else {
+ coarseLock.readLock().lockInterruptibly();
+ }
+ updateLockWait(startNanos, isWrite);
+ }
+
+ private void updateLockWait(long startNanos, boolean isWrite) {
+ long now = timer.monotonicNowNanos();
+ updateProcessingDetails(Timing.LOCKWAIT, now - startNanos);
+ if (isWrite) {
+ if (coarseLock.getWriteHoldCount() == 1) {
+ writeLockHeldTimeStampNanos = now;
+ }
+ } else {
+ if (coarseLock.getReadHoldCount() == 1) {
+ readLockHeldTimeStampNanos.set(now);
+ }
+ }
+ }
+
+ private static void updateProcessingDetails(Timing type, long deltaNanos) {
+ Server.Call call = Server.getCurCall().get();
+ if (call != null) {
+ call.getProcessingDetails().add(type, deltaNanos, TimeUnit.NANOSECONDS);
+ }
}
private static String getMetricName(String operationName, boolean isWrite) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java \
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
index 1ec47ca..5cd0fa4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
@@ -374,11 +374,6 @@ public class TestConsistentReadsObserver {
}
@Override
- public void addResponseTime(String name, int priorityLevel, int queueTime,
- int processingTime) {
- }
-
- @Override
public void stop() {
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic