[prev in list] [next in list] [prev in thread] [next in thread]
List: rhq-commits
Subject: [rhq] Branch 'jay-avail' - modules/core
From: mazz <mazz () fedoraproject ! org>
Date: 2013-11-27 17:44:25
Message-ID: 20131127174425.0F6F76036E () fedorahosted ! org
[Download RAW message or body]
modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java \
| 72 +++++++--- modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java \
| 17 +- 2 files changed, 65 insertions(+), 24 deletions(-)
New commits:
commit 7fe7f7efb4d5754d8a0ebdccaa4604a450f3e0db
Author: John Mazzitelli <mazz@redhat.com>
Date: Wed Nov 27 12:44:18 2013 -0500
get avail proxy test to pass. we no longer assume that we need to abort if the \
first time we check if the future is done and it is not. instead, we check the time \
when the future was submitted. if its been under a certain time (1m by default), then \
we just return the last avail known to have been returned. otherwise, we timeout.
diff --git a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java \
b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
index 4e14581..1bf380f 100644
--- a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
+++ b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
@@ -52,7 +52,7 @@ public class AvailabilityProxy implements AvailabilityFacet, \
Callable<Availabili
private static final Log LOG = LogFactory.getLog(AvailabilityProxy.class); // \
purposefully static, don't create one per proxy
/**
- * How long to wait for a resource to return their availability immediately (in \
ms). + * How long to wait for a resource to return their availability \
*immediately* (in ms).
* If a resource takes longer than this, then the number of timeouts is \
incremented, and then
* the container will just assume availability will be returned asynchronously \
for this resource.
*/
@@ -66,6 +66,15 @@ public class AvailabilityProxy implements AvailabilityFacet, \
Callable<Availabili
*/
private static final int AVAIL_SYNC_TIMEOUT_LIMIT;
+ /**
+ * How long to wait for an *async* future to return a resource availability (in \
ms). + * If a resource takes longer than this during an async call (via a thread \
from the executor thread pool) + * and another request comes in for the \
availability, then that async call will be canceled and a new + * one will be \
resubmitted, restarting the clock. This just helps clean up any hung threads waiting \
+ * for an availability that is just taking too much time to complete. + */
+ private static final int AVAIL_ASYNC_TIMEOUT;
+
static {
int syncAvailTimeout;
try {
@@ -86,6 +95,16 @@ public class AvailabilityProxy implements AvailabilityFacet, \
Callable<Availabili syncAvailTimeoutLimit = 5;
}
AVAIL_SYNC_TIMEOUT_LIMIT = syncAvailTimeoutLimit;
+
+ int asyncAvailTimeout;
+ try {
+ // unlikely to be changed but back-door configurable
+ asyncAvailTimeout = Integer.parseInt(System.getProperty(
+ "rhq.agent.plugins.availability-scan.async-timeout", "60000"));
+ } catch (Throwable t) {
+ asyncAvailTimeout = 60000;
+ }
+ AVAIL_ASYNC_TIMEOUT = asyncAvailTimeout;
}
private final AvailabilityFacet resourceComponent;
@@ -96,7 +115,9 @@ public class AvailabilityProxy implements AvailabilityFacet, \
Callable<Availabili
private volatile Thread current;
- private AvailabilityType last = UNKNOWN;
+ private long lastSubmitTime = 0;
+
+ private AvailabilityType lastAvail = UNKNOWN;
/**
* Number of consecutive avail sync timeouts for the resource. This value is \
reset if availability is @@ -149,25 +170,33 @@ public class AvailabilityProxy \
implements AvailabilityFacet, Callable<Availabili avail = availabilityFuture.get();
} else {
- // if the future is not done then it means this thread has been \
checking avail since the
- // last scheduled avail check. Not good. Throw a detailed \
exception to the avail checker..
- Throwable t = new Throwable();
- if (current != null) {
- t.setStackTrace(current.getStackTrace());
+ // We are still waiting on the previously submitted async avail \
check - let's just return + // the last one we got. Note that if \
the future is not done after a large amount of time, + // then it \
means this thread could somehow be hung or otherwise stuck and not returning. Not \
good. + // In this case, throw a detailed exception to the avail \
checker. + if ((System.currentTimeMillis() - lastSubmitTime) > \
AVAIL_ASYNC_TIMEOUT) { + Throwable t = new Throwable();
+ if (current != null) {
+ t.setStackTrace(current.getStackTrace());
+ }
+ String msg = "Availability check running too long, canceled \
for [" + resourceComponent + + "]; Stack trace includes \
the timed out thread's stack trace."; + \
availabilityFuture.cancel(true); +
+ // try again, maybe the situation will resolve in time for \
the next check + availabilityFuture = executor.submit(this);
+ lastSubmitTime = System.currentTimeMillis();
+
+ throw new TimeoutException(msg, t);
+ } else {
+ return lastAvail;
}
- String msg = "Availability check running too long, canceled for \
" + resourceComponent
- + "; Stack trace includes the timed out thread's stack \
trace.";
- availabilityFuture.cancel(true);
-
- // try again, maybe the situation will resolve in time for the \
next check
- availabilityFuture = executor.submit(this);
-
- throw new TimeoutException(msg, t);
}
}
// request a thread to do an avail check
availabilityFuture = executor.submit(this);
+ lastSubmitTime = System.currentTimeMillis();
// if we have exceeded the timeout too many times in a row assume that \
this is a slow
// resource and stop performing synchronous checks, which would likely \
fail to return fast enough anyway. @@ -213,7 +242,7 @@ public class AvailabilityProxy \
implements AvailabilityFacet, Callable<Availabili break;
default:
if (LOG.isDebugEnabled()) {
- LOG.debug("ResourceComponent " + resourceComponent + " \
getAvailability() returned " + type + LOG.debug("ResourceComponent [" \
+ resourceComponent + "] getAvailability() returned " + type
+ ". This is invalid and is being replaced with DOWN.");
}
result = DOWN;
@@ -222,18 +251,18 @@ public class AvailabilityProxy implements AvailabilityFacet, \
Callable<Availabili
// whenever changing to UP we reset the timeout counter. This is because \
DOWN resources often respond
// slowly to getAvailability() calls (for example, waiting for a connection \
attempt to time out). When a
// resource comes up we should give it a chance to respond quickly and \
provide live avail.
- if (result != last) {
+ if (result != lastAvail) {
if (result == UP) {
if (availAsyncConsecutiveTimeouts >= AVAIL_SYNC_TIMEOUT_LIMIT) {
if (LOG.isDebugEnabled()) {
LOG.debug("Enabling synchronous availability collection for \
[" + resourceComponent
- + "]; Availability has just changed from [" + last + "] \
to UP."); + + "]; Availability has just changed from [" + \
lastAvail + "] to UP."); }
}
availAsyncConsecutiveTimeouts = 0;
}
- last = result;
+ lastAvail = result;
}
return result;
@@ -244,8 +273,9 @@ public class AvailabilityProxy implements AvailabilityFacet, \
Callable<Availabili
*/
@Override
public String toString() {
- return "AvailabilityProxy [resourceComponent=" + resourceComponent + ", \
executor=" + executor
- + ", availabilityFuture=" + availabilityFuture + ", current=" + current \
+ ", timeouts=" + return "AvailabilityProxy [resourceComponent=" + \
resourceComponent + ", lastAvail=" + lastAvail + + ", lastSubmitTime=" + \
new java.util.Date(lastSubmitTime) + ", executor=" + + executor + ", \
availabilityFuture=" + availabilityFuture + ", current=" + current + ", timeouts=" + \
availAsyncConsecutiveTimeouts + "]"; }
}
diff --git a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java \
b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java
index f2880dd..06753ca 100644
--- a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java
+++ b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java
@@ -26,15 +26,18 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.testng.annotations.Test;
import org.rhq.core.domain.measurement.AvailabilityType;
import org.rhq.core.pluginapi.availability.AvailabilityFacet;
-@Test(enabled = false)
+@Test
public class AvailabilityProxyConcurrencyTest implements AvailabilityFacet {
+ private AtomicInteger numberOfFacetCalls = new AtomicInteger(-1);
+
public void testConcurrentAvailChecks() throws Exception {
Thread.interrupted(); // clear any hanging around interrupt status
@@ -48,7 +51,7 @@ public class AvailabilityProxyConcurrencyTest implements \
AvailabilityFacet {
assert UP.equals(firstAvail) : "Can't even get our first avail \
correctly: " + firstAvail;
// create several threads that will concurrently call getAvailability
- final int numThreads = 2;
+ final int numThreads = 10;
final Hashtable<String, AvailabilityType> availResults = new \
Hashtable<String, AvailabilityType>(numThreads);
final Hashtable<String, Date> dateResults = new Hashtable<String, \
Date>(numThreads);
final Hashtable<String, Throwable> throwableResults = new \
Hashtable<String, Throwable>(numThreads); @@ -68,6 +71,7 @@ public class \
AvailabilityProxyConcurrencyTest implements AvailabilityFacet { }
}
};
+ numberOfFacetCalls.set(0); // this will count how many times the proxy \
actually calls the facet getAvail method for (int i = 0; i < numThreads; i++) {
Thread t = new Thread(runnable, "t" + i);
t.start();
@@ -86,6 +90,12 @@ public class AvailabilityProxyConcurrencyTest implements \
AvailabilityFacet { for (AvailabilityType availtype : availResults.values()) {
assert availtype.equals(UP) : "Failed, bad avail: availResults = " + \
availResults; }
+
+ // make sure we actually tested the code we need to test - we should not \
be making + // individual facet calls for each request because we shotgun \
the requests so fast, + // and the facet sleeps so long, that the proxy \
should return the last avail rather + // than requiring a new facet call.
+ assert (numberOfFacetCalls.get()) < numThreads : numberOfFacetCalls;
} finally {
executor.shutdownNow();
}
@@ -94,7 +104,8 @@ public class AvailabilityProxyConcurrencyTest implements \
AvailabilityFacet { @Override
public synchronized AvailabilityType getAvailability() {
try {
- Thread.sleep(750);
+ System.out.println("~~~AVAILABILITY FACET CALL #" + \
numberOfFacetCalls.incrementAndGet()); + Thread.sleep(250); // just make \
it slow enough so a few proxy calls are done concurrently while this method is \
running } catch (Exception e) {
System.out.println("~~~AVAILABILITY SLEEP WAS ABORTED: " + e);
}
_______________________________________________
rhq-commits mailing list
rhq-commits@lists.fedorahosted.org
https://lists.fedorahosted.org/mailman/listinfo/rhq-commits
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic