[prev in list] [next in list] [prev in thread] [next in thread] 

List:       rhq-commits
Subject:    [rhq] Branch 'jay-avail' - modules/core
From:       mazz <mazz () fedoraproject ! org>
Date:       2013-11-27 17:44:25
Message-ID: 20131127174425.0F6F76036E () fedorahosted ! org
[Download RAW message or body]

 modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java \
|   72 +++++++---  modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java \
|   17 +-  2 files changed, 65 insertions(+), 24 deletions(-)

New commits:
commit 7fe7f7efb4d5754d8a0ebdccaa4604a450f3e0db
Author: John Mazzitelli <mazz@redhat.com>
Date:   Wed Nov 27 12:44:18 2013 -0500

    get avail proxy test to pass. we no longer assume that we need to abort if the \
first time we check if the future is done and it is not.  instead, we check the time \
when the future was submitted. if its been under a certain time (1m by default), then \
we just return the last avail known to have been returned. otherwise, we timeout.

diff --git a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java \
b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
 index 4e14581..1bf380f 100644
--- a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
                
+++ b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
 @@ -52,7 +52,7 @@ public class AvailabilityProxy implements AvailabilityFacet, \
                Callable<Availabili
     private static final Log LOG = LogFactory.getLog(AvailabilityProxy.class); // \
purposefully static, don't create one per proxy  
     /**
-     * How long to wait for a resource to return their availability immediately (in \
ms). +     * How long to wait for a resource to return their availability \
                *immediately* (in ms).
      * If a resource takes longer than this, then the number of timeouts is \
                incremented, and then
      * the container will just assume availability will be returned asynchronously \
                for this resource.
      */
@@ -66,6 +66,15 @@ public class AvailabilityProxy implements AvailabilityFacet, \
                Callable<Availabili
      */
     private static final int AVAIL_SYNC_TIMEOUT_LIMIT;
 
+    /**
+     * How long to wait for an *async* future to return a resource availability (in \
ms). +     * If a resource takes longer than this during an async call (via a thread \
from the executor thread pool) +     * and another request comes in for the \
availability, then that async call will be canceled and a new +     * one will be \
resubmitted, restarting the clock. This just helps clean up any hung threads waiting \
+     * for an availability that is just taking too much time to complete. +     */
+    private static final int AVAIL_ASYNC_TIMEOUT;
+
     static {
         int syncAvailTimeout;
         try {
@@ -86,6 +95,16 @@ public class AvailabilityProxy implements AvailabilityFacet, \
Callable<Availabili  syncAvailTimeoutLimit = 5;
         }
         AVAIL_SYNC_TIMEOUT_LIMIT = syncAvailTimeoutLimit;
+
+        int asyncAvailTimeout;
+        try {
+            // unlikely to be changed but back-door configurable
+            asyncAvailTimeout = Integer.parseInt(System.getProperty(
+                "rhq.agent.plugins.availability-scan.async-timeout", "60000"));
+        } catch (Throwable t) {
+            asyncAvailTimeout = 60000;
+        }
+        AVAIL_ASYNC_TIMEOUT = asyncAvailTimeout;
     }
 
     private final AvailabilityFacet resourceComponent;
@@ -96,7 +115,9 @@ public class AvailabilityProxy implements AvailabilityFacet, \
Callable<Availabili  
     private volatile Thread current;
 
-    private AvailabilityType last = UNKNOWN;
+    private long lastSubmitTime = 0;
+
+    private AvailabilityType lastAvail = UNKNOWN;
 
     /**
      * Number of consecutive avail sync timeouts for the resource. This value is \
reset if availability is @@ -149,25 +170,33 @@ public class AvailabilityProxy \
implements AvailabilityFacet, Callable<Availabili  avail = availabilityFuture.get();
 
                 } else {
-                    // if the future is not done then it means this thread has been \
                checking avail since the
-                    // last scheduled avail check. Not good.  Throw a detailed \
                exception to the avail checker..
-                    Throwable t = new Throwable();
-                    if (current != null) {
-                        t.setStackTrace(current.getStackTrace());
+                    // We are still waiting on the previously submitted async avail \
check - let's just return +                    // the last one we got. Note that if \
the future is not done after a large amount of time, +                    // then it \
means this thread could somehow be hung or otherwise stuck and not returning. Not \
good. +                    // In this case, throw a detailed exception to the avail \
checker. +                    if ((System.currentTimeMillis() - lastSubmitTime) > \
AVAIL_ASYNC_TIMEOUT) { +                        Throwable t = new Throwable();
+                        if (current != null) {
+                            t.setStackTrace(current.getStackTrace());
+                        }
+                        String msg = "Availability check running too long, canceled \
for [" + resourceComponent +                            + "]; Stack trace includes \
the timed out thread's stack trace."; +                        \
availabilityFuture.cancel(true); +
+                        // try again, maybe the situation will resolve in time for \
the next check +                        availabilityFuture = executor.submit(this);
+                        lastSubmitTime = System.currentTimeMillis();
+
+                        throw new TimeoutException(msg, t);
+                    } else {
+                        return lastAvail;
                     }
-                    String msg = "Availability check running too long, canceled for \
                " + resourceComponent
-                        + "; Stack trace includes the timed out thread's stack \
                trace.";
-                    availabilityFuture.cancel(true);
-
-                    // try again, maybe the situation will resolve in time for the \
                next check
-                    availabilityFuture = executor.submit(this);
-
-                    throw new TimeoutException(msg, t);
                 }
             }
 
             // request a thread to do an avail check
             availabilityFuture = executor.submit(this);
+            lastSubmitTime = System.currentTimeMillis();
 
             // if we have exceeded the timeout too many times in a row assume that \
                this is a slow
             // resource and stop performing synchronous checks, which would likely \
fail to return fast enough anyway. @@ -213,7 +242,7 @@ public class AvailabilityProxy \
implements AvailabilityFacet, Callable<Availabili  break;
         default:
             if (LOG.isDebugEnabled()) {
-                LOG.debug("ResourceComponent " + resourceComponent + " \
getAvailability() returned " + type +                LOG.debug("ResourceComponent [" \
                + resourceComponent + "] getAvailability() returned " + type
                     + ". This is invalid and is being replaced with DOWN.");
             }
             result = DOWN;
@@ -222,18 +251,18 @@ public class AvailabilityProxy implements AvailabilityFacet, \
                Callable<Availabili
         // whenever changing to UP we reset the timeout counter.  This is because \
                DOWN resources often respond
         // slowly to getAvailability() calls (for example, waiting for a connection \
                attempt to time out).  When a
         // resource comes up we should give it a chance to respond quickly and \
                provide live avail.
-        if (result != last) {
+        if (result != lastAvail) {
             if (result == UP) {
                 if (availAsyncConsecutiveTimeouts >= AVAIL_SYNC_TIMEOUT_LIMIT) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Enabling synchronous availability collection for \
                [" + resourceComponent
-                            + "]; Availability has just changed from [" + last + "] \
to UP."); +                            + "]; Availability has just changed from [" + \
lastAvail + "] to UP.");  }
                 }
                 availAsyncConsecutiveTimeouts = 0;
 
             }
-            last = result;
+            lastAvail = result;
         }
 
         return result;
@@ -244,8 +273,9 @@ public class AvailabilityProxy implements AvailabilityFacet, \
                Callable<Availabili
      */
     @Override
     public String toString() {
-        return "AvailabilityProxy [resourceComponent=" + resourceComponent + ", \
                executor=" + executor
-            + ", availabilityFuture=" + availabilityFuture + ", current=" + current \
+ ", timeouts=" +        return "AvailabilityProxy [resourceComponent=" + \
resourceComponent + ", lastAvail=" + lastAvail +            + ", lastSubmitTime=" + \
new java.util.Date(lastSubmitTime) + ", executor=" +            + executor + ", \
availabilityFuture=" + availabilityFuture + ", current=" + current + ", timeouts="  + \
availAsyncConsecutiveTimeouts + "]";  }
 }
diff --git a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java \
b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java
 index f2880dd..06753ca 100644
--- a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java
                
+++ b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java
 @@ -26,15 +26,18 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.testng.annotations.Test;
 
 import org.rhq.core.domain.measurement.AvailabilityType;
 import org.rhq.core.pluginapi.availability.AvailabilityFacet;
 
-@Test(enabled = false)
+@Test
 public class AvailabilityProxyConcurrencyTest implements AvailabilityFacet {
 
+    private AtomicInteger numberOfFacetCalls = new AtomicInteger(-1);
+
     public void testConcurrentAvailChecks() throws Exception {
         Thread.interrupted(); // clear any hanging around interrupt status
 
@@ -48,7 +51,7 @@ public class AvailabilityProxyConcurrencyTest implements \
                AvailabilityFacet {
             assert UP.equals(firstAvail) : "Can't even get our first avail \
correctly: " + firstAvail;  
             // create several threads that will concurrently call getAvailability
-            final int numThreads = 2;
+            final int numThreads = 10;
             final Hashtable<String, AvailabilityType> availResults = new \
                Hashtable<String, AvailabilityType>(numThreads);
             final Hashtable<String, Date> dateResults = new Hashtable<String, \
                Date>(numThreads);
             final Hashtable<String, Throwable> throwableResults = new \
Hashtable<String, Throwable>(numThreads); @@ -68,6 +71,7 @@ public class \
AvailabilityProxyConcurrencyTest implements AvailabilityFacet {  }
                 }
             };
+            numberOfFacetCalls.set(0); // this will count how many times the proxy \
actually calls the facet getAvail method  for (int i = 0; i < numThreads; i++) {
                 Thread t = new Thread(runnable, "t" + i);
                 t.start();
@@ -86,6 +90,12 @@ public class AvailabilityProxyConcurrencyTest implements \
AvailabilityFacet {  for (AvailabilityType availtype : availResults.values()) {
                 assert availtype.equals(UP) : "Failed, bad avail: availResults = " + \
availResults;  }
+
+            // make sure we actually tested the code we need to test - we should not \
be making +            // individual facet calls for each request because we shotgun \
the requests so fast, +            // and the facet sleeps so long, that the proxy \
should return the last avail rather +            // than requiring a new facet call.
+            assert (numberOfFacetCalls.get()) < numThreads : numberOfFacetCalls;
         } finally {
             executor.shutdownNow();
         }
@@ -94,7 +104,8 @@ public class AvailabilityProxyConcurrencyTest implements \
AvailabilityFacet {  @Override
     public synchronized AvailabilityType getAvailability() {
         try {
-            Thread.sleep(750);
+            System.out.println("~~~AVAILABILITY FACET CALL #" + \
numberOfFacetCalls.incrementAndGet()); +            Thread.sleep(250); // just make \
it slow enough so a few proxy calls are done concurrently while this method is \
running  } catch (Exception e) {
             System.out.println("~~~AVAILABILITY SLEEP WAS ABORTED: " + e);
         }


_______________________________________________
rhq-commits mailing list
rhq-commits@lists.fedorahosted.org
https://lists.fedorahosted.org/mailman/listinfo/rhq-commits


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic