[prev in list] [next in list] [prev in thread] [next in thread]
List: httpcomponents-commits
Subject: svn commit: r559958 - in
From: rolandw () apache ! org
Date: 2007-07-26 19:53:08
Message-ID: 20070726195310.430301A981D () eris ! apache ! org
[Download RAW message or body]
Author: rolandw
Date: Thu Jul 26 12:53:07 2007
New Revision: 559958
URL: http://svn.apache.org/viewvc?view=rev&rev=559958
Log:
factored out TSCCM.ConnectionPool into separate classes
Added:
jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/AbstractConnPool.java \
(with props) jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java \
(with props) Modified:
jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/BadStaticMaps.java
jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ThreadSafeClientConnManager.java
Added: jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/AbstractConnPool.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpclient/trunk/module-clien \
t/src/main/java/org/apache/http/impl/conn/tsccm/AbstractConnPool.java?view=auto&rev=559958
==============================================================================
--- jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/AbstractConnPool.java \
(added)
+++ jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/AbstractConnPool.java \
Thu Jul 26 12:53:07 2007 @@ -0,0 +1,268 @@
+/*
+ * $HeadURL$
+ * $Revision$
+ * $Date$
+ *
+ * ====================================================================
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.http.impl.conn.tsccm;
+
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.conn.ClientConnectionOperator;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.HttpRoute;
+import org.apache.http.conn.ManagedClientConnection;
+import org.apache.http.conn.OperatedClientConnection;
+import org.apache.http.conn.SchemeRegistry;
+import org.apache.http.conn.params.HttpConnectionManagerParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.impl.conn.*; //@@@ specify
+
+
+/**
+ * An abstract connection pool.
+ * It is used by the {@link ThreadSafeClientConnManager}.
+ */
+public abstract class AbstractConnPool implements RefQueueHandler {
+
+ //@@@ protected, obtain with getClass()?
+ private final Log LOG = LogFactory.getLog(AbstractConnPool.class);
+
+ /**
+ * References to issued connections.
+ * Objects in this set are of class
+ * {@link BasicPoolEntryRef BasicPoolEntryRef},
+ * and point to the pool entry for the issued connection.
+ * GCed connections are detected by the missing pool entries.
+ */
+ protected Set issuedConnections;
+
+ /** The handler for idle connections. */
+ protected IdleConnectionHandler idleConnHandler;
+
+ /** The current total number of connections. */
+ protected int numConnections;
+
+ /** The parameters of this connection pool. */
+ //@@@ allow get/set? synchronized?
+ //@@@ currently needed for connection limits
+ protected HttpParams params;
+
+
+ /** The connection manager. */
+ //@@@ replace with a weak reference to allow for GC
+ //@@@ is it necessary to have the manager in the pool entry?
+ protected ThreadSafeClientConnManager connManager;
+
+
+ /** A reference queue to track loss of pool entries to GC. */
+ //@@@ this should be a pool-specific reference queue
+ protected ReferenceQueue refQueue = BadStaticMaps.REFERENCE_QUEUE; //@@@
+
+ /** A worker (thread) to track loss of pool entries to GC. */
+ private RefQueueWorker refWorker;
+
+
+ /** Indicates whether this pool is shut down. */
+ protected volatile boolean isShutDown;
+
+
+
+ /**
+ * Creates a new connection pool.
+ *
+ * @param tsccm the connection manager
+ */
+ protected AbstractConnPool(ThreadSafeClientConnManager tsccm) {
+
+ connManager = tsccm;
+ params = tsccm.getParams();
+
+ issuedConnections = new HashSet();
+ idleConnHandler = new IdleConnectionHandler();
+
+ //@@@ currently must be false, otherwise the TSCCM
+ //@@@ will not be garbage collected in the unit test...
+ boolean conngc = false; //@@@ check parameters to decide
+ if (conngc) {
+ refQueue = new ReferenceQueue();
+ refWorker = new RefQueueWorker(refQueue, this);
+ Thread t = new Thread(refWorker); //@@@ use a thread factory
+ t.setDaemon(true);
+ t.setName("RefQueueWorker@" + this);
+ t.start();
+ }
+ }
+
+
+ /**
+ * Obtains a pool entry with a connection within the given timeout.
+ *
+ * @param route the route for which to get the connection
+ * @param timeout the timeout, or 0 for no timeout
+ * @param operator the connection operator, in case
+ * a connection has to be created
+ *
+ * @return pool entry holding a connection for the route
+ *
+ * @throws ConnectionPoolTimeoutException
+ * if the timeout expired
+ */
+ public abstract
+ BasicPoolEntry getEntry(HttpRoute route, long timeout,
+ ClientConnectionOperator operator)
+ throws ConnectionPoolTimeoutException
+ ;
+
+
+ /**
+ * Returns an entry into the pool.
+ * The connection of the entry is expected to be in a suitable state,
+ * either open and re-usable, or closed. The pool will not make any
+ * attempt to determine whether it can be re-used or not.
+ *
+ * @param entry the entry for the connection to release
+ */
+ public abstract void freeEntry(BasicPoolEntry entry)
+ ;
+
+
+
+ // non-javadoc, see interface RefQueueHandler
+ public synchronized void handleReference(Reference ref) {
+
+ if (ref instanceof BasicPoolEntryRef) {
+ // check if the GCed pool entry was still in use
+ //@@@ find a way to detect this without lookup
+ //@@@ flag in the BasicPoolEntryRef, to be reset when freed?
+ final boolean lost = issuedConnections.remove(ref);
+ if (lost) {
+ final HttpRoute route = ((BasicPoolEntryRef)ref).getRoute();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connection garbage collected. " + route);
+ }
+ handleLostEntry(route);
+ }
+ }
+ //@@@ else check if the connection manager was GCed
+ }
+
+
+ /**
+ * Handles cleaning up for a lost pool entry with the given route.
+ * A lost pool entry corresponds to a connection that was
+ * garbage collected instead of being properly released.
+ *
+ * @param route the route of the pool entry that was lost
+ */
+ protected abstract void handleLostEntry(HttpRoute route)
+ ;
+
+
+ /**
+ * Closes idle connections.
+ *
+ * @param idletime the time the connections should have been idle
+ * in order to be closed now
+ */
+ public synchronized void closeIdleConnections(long idletime) {
+ idleConnHandler.closeIdleConnections(idletime);
+ }
+
+ //@@@ revise this cleanup stuff (closeIdle+deleteClosed), it's not good
+
+ /**
+ * Deletes all entries for closed connections.
+ */
+ public abstract void deleteClosedConnections()
+ ;
+
+
+ /**
+ * Shuts down this pool and all associated resources.
+ * Overriding methods MUST call the implementation here!
+ */
+ public synchronized void shutdown() {
+
+ isShutDown = true;
+
+ // no point in monitoring GC anymore
+ if (refWorker != null)
+ refWorker.shutdown();
+
+ // close all connections that are issued to an application
+ Iterator iter = issuedConnections.iterator();
+ while (iter.hasNext()) {
+ BasicPoolEntryRef per = (BasicPoolEntryRef) iter.next();
+ iter.remove();
+ BasicPoolEntry entry = (BasicPoolEntry) per.get();
+ if (entry != null) {
+ closeConnection(entry.getConnection());
+ }
+ }
+ //@@@ while the static map exists, call there to clean it up
+ BadStaticMaps.shutdownCheckedOutConnections(this); //@@@
+
+ // remove all references to connections
+ //@@@ use this for shutting them down instead?
+ idleConnHandler.removeAll();
+ }
+
+
+ /**
+ * Closes a connection from this pool.
+ *
+ * @param conn the connection to close, or <code>null</code>
+ */
+ protected void closeConnection(final OperatedClientConnection conn) {
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (IOException ex) {
+ LOG.debug("I/O error closing connection", ex);
+ }
+ }
+ }
+
+
+
+} // class AbstractConnPool
+
Propchange: jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/AbstractConnPool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/AbstractConnPool.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/AbstractConnPool.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/BadStaticMaps.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpclient/trunk/module-clien \
t/src/main/java/org/apache/http/impl/conn/tsccm/BadStaticMaps.java?view=diff&rev=559958&r1=559957&r2=559958
==============================================================================
--- jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/BadStaticMaps.java \
(original)
+++ jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/BadStaticMaps.java \
Thu Jul 26 12:53:07 2007 @@ -145,7 +145,7 @@
static /*default*/ void storeReferenceToConnection(
BasicPoolEntry entry,
HttpRoute route,
- ThreadSafeClientConnManager.ConnectionPool connectionPool
+ AbstractConnPool connectionPool
) {
ConnectionSource source = new ConnectionSource();
@@ -187,7 +187,7 @@
* @param connectionPool the pool for which to shutdown the connections
*/
static /*default*/
- void shutdownCheckedOutConnections(ThreadSafeClientConnManager.ConnectionPool \
connectionPool) { + void shutdownCheckedOutConnections(AbstractConnPool \
connectionPool) {
// keep a list of the connections to be closed
ArrayList connectionsToClose = new ArrayList();
@@ -227,7 +227,7 @@
private static class ConnectionSource {
/** The connection pool that created the connection */
- public ThreadSafeClientConnManager.ConnectionPool connectionPool;
+ public AbstractConnPool connectionPool;
/** The connection's planned route. */
public HttpRoute route;
@@ -276,7 +276,7 @@
+ source.route);
}
- source.connectionPool.handleLostConnection(source.route);
+ source.connectionPool.handleLostEntry(source.route);
}
}
Added: jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpclient/trunk/module-clien \
t/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java?view=auto&rev=559958
==============================================================================
--- jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java \
(added)
+++ jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java \
Thu Jul 26 12:53:07 2007 @@ -0,0 +1,597 @@
+/*
+ * $HeadURL$
+ * $Revision$
+ * $Date$
+ *
+ * ====================================================================
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.http.impl.conn.tsccm;
+
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.conn.ClientConnectionOperator;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.HttpRoute;
+import org.apache.http.conn.ManagedClientConnection;
+import org.apache.http.conn.OperatedClientConnection;
+import org.apache.http.conn.SchemeRegistry;
+import org.apache.http.conn.params.HttpConnectionManagerParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.impl.conn.*; //@@@ specify
+
+
+/**
+ * A connection pool that maintains connections by route.
+ * This class is derived from <code>MultiThreadedHttpConnectionManager</code>
+ * in HttpClient 3.x, see there for original authors. It implements the same
+ * algorithm for connection re-use and connection-per-host enforcement:
+ * <ul>
+ * <li>connections are re-used only for the exact same route</li>
+ * <li>connection limits are enforced per route rather than per host</li>
+ * </ul>
+ *
+ * @author <a href="mailto:rolandw at apache.org">Roland Weber</a>
+ * @author <a href="mailto:becke@u.washington.edu">Michael Becke</a>
+ * @author and others
+ */
+public class ConnPoolByRoute extends AbstractConnPool {
+
+ //@@@ use a protected LOG in the base class?
+ private final Log LOG = LogFactory.getLog(ConnPoolByRoute.class);
+
+
+ /** The list of free connections */
+ private LinkedList freeConnections;
+
+ /** The list of WaitingThreads waiting for a connection */
+ private LinkedList waitingThreads;
+
+ /**
+ * A map of route-specific pools.
+ * Keys are of class {@link HttpRoute},
+ * values of class {@link RouteConnPool}.
+ */
+ private final Map routeToPool;
+
+
+ /**
+ * A simple struct-like class to combine the connection list
+ * and the count of created connections.
+ */
+ protected static class RouteConnPool {
+
+ /** The route this pool is for. */
+ public final HttpRoute route;
+
+ /** The list of free connections. */
+ public LinkedList freeConnections;
+
+ /** The list of WaitingThreads for this pool. */
+ public LinkedList waitingThreads;
+
+ /** The number of created connections. */
+ public int numConnections;
+
+ /**
+ * Creates a new route-specific pool.
+ *
+ * @param r the route for which to pool
+ */
+ public RouteConnPool(HttpRoute r) {
+ this.route = r;
+ this.freeConnections = new LinkedList();
+ this.waitingThreads = new LinkedList();
+ this.numConnections = 0;
+ }
+ } // class RouteConnPool
+
+
+ /**
+ * A thread and the pool in which it is waiting.
+ */
+ private static class WaitingThread {
+
+ /** The thread that is waiting for a connection */
+ public Thread thread;
+
+ /** The connection pool the thread is waiting for */
+ public RouteConnPool pool;
+
+ /**
+ * Indicates the source of an interruption.
+ * Set to <code>true</code> inside
+ * {@link #notifyWaitingThread(RouteConnPool)}
+ * and {@link #shutdown shutdown()}
+ * before the thread is interrupted.
+ * If not set, the thread was interrupted from the outside.
+ */
+ public boolean interruptedByConnectionPool = false;
+ }
+
+
+
+ /**
+ * Creates a new connection pool, managed by route.
+ *
+ * @param tsccm the connection manager
+ */
+ public ConnPoolByRoute(ThreadSafeClientConnManager tsccm) {
+ super(tsccm);
+
+ freeConnections = new LinkedList();
+ waitingThreads = new LinkedList();
+ routeToPool = new HashMap();
+ }
+
+
+ /**
+ * Get a route-specific pool of available connections.
+ *
+ * @param route the route
+ *
+ * @return the pool for the argument route, never <code>null</code>
+ */
+ protected synchronized RouteConnPool getRoutePool(HttpRoute route) {
+
+ RouteConnPool rcp = (RouteConnPool) routeToPool.get(route);
+ if (rcp == null) {
+ // no pool for this route yet (or anymore)
+ rcp = newRouteConnPool(route);
+ routeToPool.put(route, rcp);
+ }
+
+ return rcp;
+ }
+
+
+ /**
+ * Creates a new route-specific pool.
+ * Called by {@link #getRoutePool getRoutePool}, if necessary.
+ *
+ * @param route the route
+ *
+ * @return the new pool
+ */
+ protected RouteConnPool newRouteConnPool(HttpRoute route) {
+ return new RouteConnPool(route);
+ }
+
+
+ //@@@ consider alternatives for gathering statistics
+ public synchronized int getConnectionsInPool(HttpRoute route) {
+ //@@@ don't allow a pool to be created here!
+ RouteConnPool rcp = getRoutePool(route);
+ return rcp.numConnections;
+ }
+
+
+ // non-javadoc, see base class AbstractConnPool
+ //@@@ can we keep the operator out of here and simply return a
+ //@@@ pool entry without a connection, to be filled in by the manager?
+ public synchronized
+ BasicPoolEntry getEntry(HttpRoute route, long timeout,
+ ClientConnectionOperator operator)
+ throws ConnectionPoolTimeoutException {
+
+ BasicPoolEntry entry = null;
+
+ int maxHostConnections = HttpConnectionManagerParams
+ .getMaxConnectionsPerHost(this.params, route);
+ int maxTotalConnections = HttpConnectionManagerParams
+ .getMaxTotalConnections(this.params);
+
+ RouteConnPool routePool = getRoutePool(route);
+ WaitingThread waitingThread = null;
+
+ boolean useTimeout = (timeout > 0);
+ long timeToWait = timeout;
+ long startWait = 0;
+ long endWait = 0;
+
+ while (entry == null) {
+
+ if (isShutDown) {
+ throw new IllegalStateException
+ ("Connection pool shut down.");
+ }
+
+ // the cases to check for:
+ // - have a free connection for that route
+ // - allowed to create a free connection for that route
+ // - can delete and replace a free connection for another route
+ // - need to wait for one of the things above to come true
+
+ if (routePool.freeConnections.size() > 0) {
+ //@@@ why pass the route, we already have the pool?
+ entry = getFreeEntry(route);
+
+ } else if ((routePool.numConnections < maxHostConnections) &&
+ (numConnections < maxTotalConnections)) {
+
+ //@@@ why pass the route, we already have the pool?
+ entry = createEntry(route, operator);
+
+ } else if ((routePool.numConnections < maxHostConnections) &&
+ (freeConnections.size() > 0)) {
+
+ deleteLeastUsedEntry();
+ //@@@ why pass the route, we already have the pool?
+ entry = createEntry(route, operator);
+
+ } else {
+ // TODO: keep track of which routes have waiting
+ // threads, so they avoid being sacrificed before necessary
+
+ try {
+ if (useTimeout && timeToWait <= 0) {
+ throw new ConnectionPoolTimeoutException
+ ("Timeout waiting for connection");
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Need to wait for connection. " + route);
+ }
+
+ if (waitingThread == null) {
+ waitingThread = new WaitingThread();
+ waitingThread.pool = routePool;
+ waitingThread.thread = Thread.currentThread();
+ } else {
+ waitingThread.interruptedByConnectionPool = false;
+ }
+
+ if (useTimeout) {
+ startWait = System.currentTimeMillis();
+ }
+
+ routePool.waitingThreads.addLast(waitingThread);
+ waitingThreads.addLast(waitingThread);
+ wait(timeToWait);
+
+ } catch (InterruptedException e) {
+ if (!waitingThread.interruptedByConnectionPool) {
+ LOG.debug("Interrupted while waiting for connection.", e);
+ throw new IllegalThreadStateException(
+ "Interrupted while waiting in " + this);
+ }
+ // Else, do nothing, we were interrupted by the
+ // connection pool and should now have a connection
+ // waiting for us. Continue in the loop and get it.
+ // Or else we are shutting down, which is also
+ // detected in the loop.
+ } finally {
+ if (!waitingThread.interruptedByConnectionPool) {
+ // Either we timed out, experienced a
+ // "spurious wakeup", or were interrupted by an
+ // external thread. Regardless we need to
+ // cleanup for ourselves in the wait queue.
+ routePool.waitingThreads.remove(waitingThread);
+ waitingThreads.remove(waitingThread);
+ }
+
+ if (useTimeout) {
+ endWait = System.currentTimeMillis();
+ timeToWait -= (endWait - startWait);
+ }
+ }
+ }
+ } // while no entry
+
+ return entry;
+
+ } // getEntry
+
+
+ // non-javadoc, see base class AbstractConnPool
+ public synchronized void freeEntry(BasicPoolEntry entry) {
+
+ HttpRoute route = entry.getPlannedRoute();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Freeing connection. " + route);
+ }
+
+ if (isShutDown) {
+ // the pool is shut down, release the
+ // connection's resources and get out of here
+ closeConnection(entry.getConnection());
+ return;
+ }
+
+ RouteConnPool routePool = getRoutePool(route);
+
+ // Put the connection back in the available list
+ // and notify a waiter
+ routePool.freeConnections.add(entry);
+ if (routePool.numConnections == 0) {
+ // for some reason the route pool didn't already exist
+ LOG.error("Route connection pool not found. " + route);
+ routePool.numConnections = 1;
+ }
+ freeConnections.add(entry);
+
+ // We can remove the reference to this connection as we have
+ // control over it again. This also ensures that the connection
+ // manager can be GCed.
+ BadStaticMaps.removeReferenceToConnection(entry); //@@@
+ issuedConnections.remove(entry.getWeakRef()); //@@@ move above
+ if (numConnections == 0) {
+ // for some reason this pool didn't already exist
+ LOG.error("Master connection pool not found. " + route);
+ numConnections = 1;
+ }
+
+ // register the connection with the timeout handler
+ idleConnHandler.add(entry.getConnection());
+
+ notifyWaitingThread(routePool);
+
+ } // freeEntry
+
+
+
+ /**
+ * If available, get a free pool entry for a route.
+ *
+ * @param route the planned route
+ *
+ * @return an available pool entry for the given route
+ */
+ protected synchronized BasicPoolEntry getFreeEntry(HttpRoute route) {
+
+ BasicPoolEntry entry = null;
+
+ RouteConnPool routePool = getRoutePool(route);
+
+ if (routePool.freeConnections.size() > 0) {
+ entry = (BasicPoolEntry) routePool.freeConnections.removeLast();
+ freeConnections.remove(entry);
+
+ // store a reference to this entry so that it can be cleaned up
+ // in the event it is not correctly released
+ BadStaticMaps.storeReferenceToConnection(entry, route, this); //@@@
+ issuedConnections.add(entry.getWeakRef());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Getting free connection. " + route);
+ }
+ idleConnHandler.remove(entry.getConnection()); // no longer idle
+
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No free connections. " + route);
+ }
+ }
+ return entry;
+ }
+
+
+ /**
+ * Creates a new pool entry.
+ * This method assumes that the new connection will be handed
+ * out immediately.
+ *
+ * @param route the route associated with the new entry
+ * @param op the operator for creating a connection
+ *
+ * @return the new pool entry, holding a new connection
+ */
+ protected synchronized
+ BasicPoolEntry createEntry(HttpRoute route,
+ ClientConnectionOperator op) {
+
+ RouteConnPool routePool = getRoutePool(route);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating new connection. " + route);
+ }
+
+ OperatedClientConnection conn = op.createConnection();
+ BasicPoolEntry entry = new BasicPoolEntry
+ (connManager, conn, route, refQueue);
+ numConnections++;
+ routePool.numConnections++;
+
+ // store a reference to this entry so that it can be cleaned up
+ // in the event it is not correctly released
+ BadStaticMaps.storeReferenceToConnection(entry, route, this); //@@@
+ issuedConnections.add(entry.getWeakRef());
+
+ return entry;
+ }
+
+
+ /**
+ * Deletes a given pool entry.
+ * This closes the pooled connection and removes all references,
+ * so that it can be GCed.
+ *
+ * <p><b>Note:</b> Does not remove the entry from the freeConnections list.
+ * It is assumed that the caller has already handled this step.</p>
+ * <!-- @@@ is that a good idea? or rather fix it? -->
+ *
+ * @param entry the pool entry for the connection to delete
+ */
+ protected synchronized void deleteEntry(BasicPoolEntry entry) {
+
+ HttpRoute route = entry.getPlannedRoute();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Deleting connection. " + route);
+ }
+
+ closeConnection(entry.getConnection());
+
+ RouteConnPool routePool = getRoutePool(route);
+ routePool.freeConnections.remove(entry);
+ routePool.numConnections--;
+ numConnections--;
+ if ((routePool.numConnections < 1) &&
+ routePool.waitingThreads.isEmpty()) {
+
+ routeToPool.remove(route);
+ }
+
+ idleConnHandler.remove(entry.getConnection()); // not idle, but dead
+ }
+
+
+ /**
+ * Delete an old, free pool entry to make room for a new one.
+ * Used to replace pool entries with ones for a different route.
+ */
+ protected synchronized void deleteLeastUsedEntry() {
+
+ //@@@ with get() instead of remove, we could
+ //@@@ leave the removing to deleteEntry()
+ BasicPoolEntry entry = (BasicPoolEntry) freeConnections.removeFirst();
+
+ if (entry != null) {
+ deleteEntry(entry);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("No free connection to delete.");
+ }
+ }
+
+
+ // non-javadoc, see base class AbstractConnPool
+ protected synchronized void handleLostEntry(HttpRoute route) {
+
+ RouteConnPool routePool = getRoutePool(route);
+ routePool.numConnections--;
+
+ if ((routePool.numConnections < 1) &&
+ routePool.waitingThreads.isEmpty()) {
+
+ routeToPool.remove(route);
+ }
+
+ numConnections--;
+ notifyWaitingThread(routePool);
+ }
+
+
+ /**
+ * Notifies a waiting thread that a connection is available.
+ * This will wake a thread waiting in the specific route pool,
+ * if there is one.
+ * Otherwise, a thread in the connection pool will be notified.
+ *
+ * @param routePool the pool in which to notify, or <code>null</code>
+ */
+ protected synchronized void notifyWaitingThread(RouteConnPool routePool) {
+
+ //@@@ while this strategy provides for best connection re-use,
+ //@@@ is it fair? only do this if the connection is open?
+ // Find the thread we are going to notify. We want to ensure that
+ // each waiting thread is only interrupted once, so we will remove
+ // it from all wait queues before interrupting.
+ WaitingThread waitingThread = null;
+
+ if ((routePool != null) && !routePool.waitingThreads.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Notifying thread waiting on pool. "
+ + routePool.route);
+ }
+ waitingThread = (WaitingThread)
+ routePool.waitingThreads.removeFirst();
+ waitingThreads.remove(waitingThread);
+
+ } else if (!waitingThreads.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Notifying thread waiting on any pool.");
+ }
+ waitingThread = (WaitingThread) waitingThreads.removeFirst();
+ waitingThread.pool.waitingThreads.remove(waitingThread);
+
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Notifying no-one, there are no waiting threads");
+ }
+
+ if (waitingThread != null) {
+ waitingThread.interruptedByConnectionPool = true;
+ waitingThread.thread.interrupt();
+ }
+ }
+
+
+ //@@@ revise this cleanup stuff
+ //@@@ move method to base class when deleteEntry() is fixed
+ // non-javadoc, see base class AbstractConnPool
+ public synchronized void deleteClosedConnections() {
+
+ Iterator iter = freeConnections.iterator();
+ while (iter.hasNext()) {
+ BasicPoolEntry entry = (BasicPoolEntry) iter.next();
+ if (!entry.getConnection().isOpen()) {
+ iter.remove();
+ deleteEntry(entry);
+ }
+ }
+ }
+
+
+ // non-javadoc, see base class AbstractConnPool
+ public synchronized void shutdown() {
+
+ super.shutdown();
+
+ // close all free connections
+ //@@@ move this to base class?
+ Iterator iter = freeConnections.iterator();
+ while (iter.hasNext()) {
+ BasicPoolEntry entry = (BasicPoolEntry) iter.next();
+ iter.remove();
+ closeConnection(entry.getConnection());
+ }
+
+
+ // interrupt all waiting threads
+ iter = waitingThreads.iterator();
+ while (iter.hasNext()) {
+ WaitingThread waiter = (WaitingThread) iter.next();
+ iter.remove();
+ waiter.interruptedByConnectionPool = true;
+ waiter.thread.interrupt();
+ }
+
+ routeToPool.clear();
+ }
+
+
+} // class ConnPoolByRoute
+
Propchange: jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ThreadSafeClientConnManager.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpclient/trunk/module-clien \
t/src/main/java/org/apache/http/impl/conn/tsccm/ThreadSafeClientConnManager.java?view=diff&rev=559958&r1=559957&r2=559958
==============================================================================
--- jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ThreadSafeClientConnManager.java \
(original)
+++ jakarta/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ThreadSafeClientConnManager.java \
Thu Jul 26 12:53:07 2007 @@ -89,7 +89,8 @@
/** The pool of connections being managed. */
- private ConnectionPool connectionPool;
+ //@@@ private ConnectionPool connectionPool;
+ private AbstractConnPool connectionPool;
/** The operator for opening and updating connections. */
//@@@ temporarily visible to BasicPoolEntry
@@ -115,7 +116,8 @@
}
this.params = params;
this.schemeRegistry = schreg;
- this.connectionPool = new ConnectionPool();
+ //@@@ this.connectionPool = new ConnectionPool();
+ this.connectionPool = new ConnPoolByRoute(this);
this.connOperator = createConnectionOperator(schreg);
this.isShutDown = false;
@@ -163,13 +165,15 @@
+ route + ", timeout = " + timeout);
}
- final BasicPoolEntry entry = doGetConnection(route, timeout);
+ //@@@ final BasicPoolEntry entry = doGetConnection(route, timeout);
+ final BasicPoolEntry entry =
+ connectionPool.getEntry(route, timeout, connOperator);
return new TSCCMConnAdapter(this, entry);
}
- /**
+ /* *
* Obtains a connection within the given timeout.
*
* @param route the route for which to get the connection
@@ -178,7 +182,7 @@
* @return the pool entry for the connection
*
* @throws ConnectionPoolTimeoutException if the timeout expired
- */
+ * /
private BasicPoolEntry doGetConnection(HttpRoute route,
long timeout)
throws ConnectionPoolTimeoutException {
@@ -295,20 +299,21 @@
return entry;
} // doGetConnection
+ */
-
- /**
+ /* *
* Creates a connection to be managed, along with a pool entry.
*
* @param route the route for which to create the connection
*
* @return the pool entry for the new connection
- */
+ * /
private BasicPoolEntry createPoolEntry(HttpRoute route) {
OperatedClientConnection occ = connOperator.createConnection();
return connectionPool.createEntry(route, occ);
}
+ */
/**
@@ -392,7 +397,8 @@
if (entry == null)
return;
- connectionPool.freeConnection(entry);
+ //@@@ connectionPool.freeConnection(entry);
+ connectionPool.freeEntry(entry);
}
@@ -434,10 +440,13 @@
* @return the total number of pooled connections for that route
*/
public int getConnectionsInPool(HttpRoute route) {
+ return ((ConnPoolByRoute)connectionPool).getConnectionsInPool(route);
+/*
synchronized (connectionPool) {
RouteConnPool routePool = connectionPool.getRoutePool(route);
return routePool.numConnections;
}
+*/
}
/**
@@ -501,7 +510,7 @@
* as well as per-route lists.
*/
//@@@ temporary package visibility, for BadStaticMaps
- class /*default*/ ConnectionPool implements RefQueueHandler {
+ /*default*/ class ConnectionPool implements RefQueueHandler {
/** The list of free connections */
private LinkedList freeConnections = new LinkedList();
@@ -582,7 +591,7 @@
}
}
//@@@ while the static map exists, call there to clean it up
- BadStaticMaps.shutdownCheckedOutConnections(this); //@@@
+ //BadStaticMaps.shutdownCheckedOutConnections(this); //@@@
// interrupt all waiting threads
iter = waitingThreads.iterator();
@@ -625,7 +634,7 @@
// store a reference to this entry so that it can be cleaned up
// in the event it is not correctly released
- BadStaticMaps.storeReferenceToConnection(entry, route, this); //@@@
+ //BadStaticMaps.storeReferenceToConnection(entry, route, this); //@@@
issuedConnections.add(entry.getWeakRef());
return entry;
@@ -719,7 +728,7 @@
// store a reference to this entry so that it can be cleaned up
// in the event it is not correctly released
- BadStaticMaps.storeReferenceToConnection(entry, route, this); //@@@
+ //BadStaticMaps.storeReferenceToConnection(entry, route, this); \
//@@@ issuedConnections.add(entry.getWeakRef());
if (LOG.isDebugEnabled()) {
LOG.debug("Getting free connection, route=" + route);
@@ -911,6 +920,7 @@
} // class ConnectionPool
+ //@@@ move to pool?
static /*default*/ void closeConnection(final OperatedClientConnection conn) {
if (conn != null) {
try {
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic