[prev in list] [next in list] [prev in thread] [next in thread]
List: jboss-cvs-commits
Subject: [jboss-cvs] jbosscx/src/main/org/jboss/resource/adapter/jms JmsMapMessage.java JmsBytesMessage.java
From: Adrian Brock <ejort () users ! sourceforge ! net>
Date: 2004-02-28 19:07:05
Message-ID: E1Ax9nl-0001NP-3n () sc8-pr-cvs1 ! sourceforge ! net
[Download RAW message or body]
User: ejort
Date: 04/02/28 11:07:04
Modified: src/main/org/jboss/resource/adapter/jms
JmsManagedConnectionFactory.java
JmsConnectionManager.java
JmsConnectionFactoryImpl.java JmsCred.java
JmsConnectionRequestInfo.java JmsSession.java
JmsManagedConnection.java JmsLocalTransaction.java
JmsSessionFactory.java JmsSessionFactoryImpl.java
Added: src/main/org/jboss/resource/adapter/jms JmsMapMessage.java
JmsBytesMessage.java JmsQueueReceiver.java
JmsMessageListener.java JmsConnectionMetaData.java
JmsStreamMessage.java JmsObjectMessage.java
JmsMessageConsumer.java JmsTopicSubscriber.java
JmsTextMessage.java JmsMessage.java
Log:
Major improvements in the JmsRA
Revision Changes Path
1.8 +263 -233 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java
Index: JmsManagedConnectionFactory.java
===================================================================
RCS file: /cvsroot/jboss/jbosscx/src/main/org/jboss/resource/adapter/jms/JmsManagedConnectionFactory.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- JmsManagedConnectionFactory.java 27 Aug 2003 04:29:38 -0000 1.7
+++ JmsManagedConnectionFactory.java 28 Feb 2004 19:07:03 -0000 1.8
@@ -16,6 +16,7 @@
import javax.security.auth.Subject;
+import javax.jms.ConnectionMetaData;
import javax.resource.ResourceException;
import javax.resource.spi.ManagedConnectionFactory;
@@ -33,248 +34,277 @@
/**
* ???
- *
+ *
* Created: Sat Mar 31 03:08:35 2001
- *
- * @author <a href="mailto:peter.antman@tim.se">Peter Antman</a>.
- * @version $Revision: 1.7 $
+ *
+ * @author <a href="mailto:peter.antman@tim.se">Peter Antman </a>.
+ * @version $Revision: 1.8 $
*/
-public class JmsManagedConnectionFactory
- implements ManagedConnectionFactory
+public class JmsManagedConnectionFactory implements ManagedConnectionFactory
{
- private static final Logger log = Logger.getLogger(JmsManagedConnection.class);
+ private static final Logger log = Logger.getLogger(JmsManagedConnection.class);
- /** Settable attributes in ra.xml */
- private JmsMCFProperties mcfProperties = new JmsMCFProperties();
+ /** Settable attributes in ra.xml */
+ private JmsMCFProperties mcfProperties = new JmsMCFProperties();
- /** For local access. */
- private JMSProviderAdapter adapter;
-
- public JmsManagedConnectionFactory() {
- // empty
- }
-
- /**
- * Create a "non managed" connection factory. No appserver involved
- */
- public Object createConnectionFactory() throws ResourceException
- {
- return createConnectionFactory(null);
- }
-
- /**
- * Create a ConnectionFactory with appserver hook
- */
- public Object createConnectionFactory(ConnectionManager cxManager)
- throws ResourceException
- {
- Object cf = new JmsConnectionFactoryImpl(this, cxManager);
-
- if (log.isTraceEnabled()) {
- log.trace("Created connection factory: " + cf + ", using connection \
manager: " + cxManager);
- }
-
- return cf;
- }
-
- /**
- * Create a new connection to manage in pool
- */
- public ManagedConnection createManagedConnection(Subject subject,
- ConnectionRequestInfo info)
- throws ResourceException
- {
- boolean trace = log.isTraceEnabled();
-
- info = getInfo(info);
- if (trace) log.trace("connection request info: " + info);
-
- JmsCred cred = JmsCred.getJmsCred(this,subject, info);
- if (trace) log.trace("jms credentials: " + cred);
-
- // OK we got autentication stuff
- JmsManagedConnection mc =
- new JmsManagedConnection(this, info, cred.name, cred.pwd);
-
- if (trace) log.trace("created new managed connection: " + mc);
-
- // Set default logwriter according to spec
-
- //
- // jason: screw the logWriter stuff for now it sucks ass
- //
-
- return mc;
- }
+ /** For local access. */
+ private JMSProviderAdapter adapter;
- /**
- * Match a set of connections from the pool
- */
- public ManagedConnection matchManagedConnections(Set connectionSet,
- Subject subject,
- ConnectionRequestInfo info)
- throws ResourceException
- {
- boolean trace = log.isTraceEnabled();
-
- // Get cred
- info = getInfo(info);
- JmsCred cred = JmsCred.getJmsCred(this, subject, info);
-
- if (trace) log.trace("Looking for connection matching credentials: " + \
cred);
-
- // Traverse the pooled connections and look for a match, return first found
- Iterator connections = connectionSet.iterator();
-
- while (connections.hasNext()) {
- Object obj = connections.next();
-
- // We only care for connections of our own type
- if (obj instanceof JmsManagedConnection) {
- // This is one from the pool
- JmsManagedConnection mc = (JmsManagedConnection) obj;
-
- // Check if we even created this on
- ManagedConnectionFactory mcf =
- mc.getManagedConnectionFactory();
-
- // Only admit a connection if it has the same username as our
- // asked for creds
-
- // FIXME, Here we have a problem, jms connection
- // may be anonymous, have a user name
-
- if ((mc.getUserName() == null ||
- (mc.getUserName() != null &&
- mc.getUserName().equals(cred.name))) && mcf.equals(this))
- {
- // Now check if ConnectionInfo equals
- if (info.equals( mc.getInfo() )) {
-
- if (trace) log.trace("Found matching connection: " + mc);
-
- return mc;
- }
- }
- }
- }
-
- if (trace) log.trace("No matching connection was found");
-
- return null;
- }
+ public JmsManagedConnectionFactory()
+ {
+ // empty
+ }
+
+ /**
+ * Create a "non managed" connection factory. No appserver involved
+ */
+ public Object createConnectionFactory() throws ResourceException
+ {
+ return createConnectionFactory(null);
+ }
+
+ /**
+ * Create a ConnectionFactory with appserver hook
+ */
+ public Object createConnectionFactory(ConnectionManager cxManager) throws \
ResourceException + {
+ Object cf = new JmsConnectionFactoryImpl(this, cxManager);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Created connection factory: " + cf + ", using connection manager: " \
+ cxManager); + }
+
+ return cf;
+ }
+
+ /**
+ * Create a new connection to manage in pool
+ */
+ public ManagedConnection createManagedConnection(Subject subject, \
ConnectionRequestInfo info) + throws ResourceException
+ {
+ boolean trace = log.isTraceEnabled();
+
+ info = getInfo(info);
+ if (trace)
+ log.trace("connection request info: " + info);
+
+ JmsCred cred = JmsCred.getJmsCred(this, subject, info);
+ if (trace)
+ log.trace("jms credentials: " + cred);
+
+ // OK we got autentication stuff
+ JmsManagedConnection mc = new JmsManagedConnection(this, info, cred.name, \
cred.pwd); +
+ if (trace)
+ log.trace("created new managed connection: " + mc);
+
+ // Set default logwriter according to spec
+
+ //
+ // jason: screw the logWriter stuff for now it sucks ass
+ //
+
+ return mc;
+ }
+
+ /**
+ * Match a set of connections from the pool
+ */
+ public ManagedConnection matchManagedConnections(Set connectionSet, Subject \
subject, ConnectionRequestInfo info) + throws ResourceException
+ {
+ boolean trace = log.isTraceEnabled();
+
+ // Get cred
+ info = getInfo(info);
+ JmsCred cred = JmsCred.getJmsCred(this, subject, info);
+
+ if (trace)
+ log.trace("Looking for connection matching credentials: " + cred);
+
+ // Traverse the pooled connections and look for a match, return first
+ // found
+ Iterator connections = connectionSet.iterator();
+
+ while (connections.hasNext())
+ {
+ Object obj = connections.next();
+
+ // We only care for connections of our own type
+ if (obj instanceof JmsManagedConnection)
+ {
+ // This is one from the pool
+ JmsManagedConnection mc = (JmsManagedConnection) obj;
+
+ // Check if we even created this on
+ ManagedConnectionFactory mcf = mc.getManagedConnectionFactory();
+
+ // Only admit a connection if it has the same username as our
+ // asked for creds
+
+ // FIXME, Here we have a problem, jms connection
+ // may be anonymous, have a user name
+
+ if ((mc.getUserName() == null || (mc.getUserName() != null && \
mc.getUserName().equals(cred.name))) + && mcf.equals(this))
+ {
+ // Now check if ConnectionInfo equals
+ if (info.equals(mc.getInfo()))
+ {
+
+ if (trace)
+ log.trace("Found matching connection: " + mc);
+
+ return mc;
+ }
+ }
+ }
+ }
+
+ if (trace)
+ log.trace("No matching connection was found");
+
+ return null;
+ }
+
+ public void setLogWriter(PrintWriter out) throws ResourceException
+ {
+ //
+ // jason: screw the logWriter stuff for now it sucks ass
+ //
+ }
+
+ public PrintWriter getLogWriter() throws ResourceException
+ {
+ //
+ // jason: screw the logWriter stuff for now it sucks ass
+ //
+
+ return null;
+ }
+
+ /**
+ * Checks for equality ower the configured properties.
+ */
+ public boolean equals(Object obj)
+ {
+ if (obj == null)
+ return false;
+ if (obj instanceof JmsManagedConnectionFactory)
+ {
+ return mcfProperties.equals(((JmsManagedConnectionFactory) \
obj).getProperties()); + }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return mcfProperties.hashCode();
+ }
+
+ // --- Connfiguration API ---
+
+ public void setJmsProviderAdapterJNDI(String jndi)
+ {
+ mcfProperties.setProviderJNDI(jndi);
+ }
+
+ public String getJmsProviderAdapterJNDI()
+ {
+ return mcfProperties.getProviderJNDI();
+ }
+
+ /**
+ * Set userName, null by default.
+ */
+ public void setUserName(String userName)
+ {
+ mcfProperties.setUserName(userName);
+ }
+
+ /**
+ * Get userName, may be null.
+ */
+ public String getUserName()
+ {
+ return mcfProperties.getUserName();
+ }
+
+ /**
+ * Set password, null by default.
+ */
+ public void setPassword(String password)
+ {
+ mcfProperties.setPassword(password);
+ }
+
+ /**
+ * Get password, may be null.
+ */
+ public String getPassword()
+ {
+ return mcfProperties.getPassword();
+ }
+
+ /**
+ * Set the default session typ
+ *
+ * @param type either javax.jms.Topic or javax.jms.Queue
+ *
+ * @exception ResourceException if type was not a valid type.
+ */
+ public void setSessionDefaultType(String type) throws ResourceException
+ {
+ mcfProperties.setSessionDefaultType(type);
+ }
+
+ public String getSessionDefaultType()
+ {
+ return mcfProperties.getSessionDefaultType();
+ }
+
+ /**
+ * For local access
+ */
+ public void setJmsProviderAdapter(final JMSProviderAdapter adapter)
+ {
+ this.adapter = adapter;
+ }
+
+ public JMSProviderAdapter getJmsProviderAdapter()
+ {
+ return adapter;
+ }
+
+ private ConnectionRequestInfo getInfo(ConnectionRequestInfo info)
+ {
+ if (info == null)
+ {
+ // Create a default one
+ return new JmsConnectionRequestInfo(mcfProperties);
+ }
+ else
+ {
+ // Fill the one with any defaults
+ ((JmsConnectionRequestInfo) info).setDefaults(mcfProperties);
+ return info;
+ }
+ }
- public void setLogWriter(PrintWriter out)
- throws ResourceException
+ public ConnectionMetaData getMetaData()
{
- //
- // jason: screw the logWriter stuff for now it sucks ass
- //
- }
-
- public PrintWriter getLogWriter() throws ResourceException {
- //
- // jason: screw the logWriter stuff for now it sucks ass
- //
-
- return null;
- }
-
- /**
- * Checks for equality ower the configured properties.
- */
- public boolean equals(Object obj) {
- if (obj == null) return false;
- if (obj instanceof JmsManagedConnectionFactory) {
- return mcfProperties.equals( \
((JmsManagedConnectionFactory)obj).getProperties());
- }
- else {
- return false;
- }
- }
-
- public int hashCode() {
- return mcfProperties.hashCode();
- }
-
- // --- Connfiguration API ---
-
- public void setJmsProviderAdapterJNDI(String jndi) {
- mcfProperties.setProviderJNDI(jndi);
- }
-
- public String getJmsProviderAdapterJNDI() {
- return mcfProperties.getProviderJNDI();
- }
-
- /**
- * Set userName, null by default.
- */
- public void setUserName(String userName) {
- mcfProperties.setUserName(userName);
- }
-
- /**
- * Get userName, may be null.
- */
- public String getUserName() {
- return mcfProperties.getUserName();
+ return new JmsConnectionMetaData();
}
- /**
- * Set password, null by default.
- */
- public void setPassword(String password) {
- mcfProperties.setPassword(password);
- }
- /**
- * Get password, may be null.
- */
- public String getPassword() {
- return mcfProperties.getPassword();
- }
-
- /**
- * Set the default session typ
- *
- * @param type either javax.jms.Topic or javax.jms.Queue
- *
- * @exception ResourceException if type was not a valid type.
- */
- public void setSessionDefaultType(String type) throws ResourceException {
- mcfProperties.setSessionDefaultType(type);
- }
-
- public String getSessionDefaultType() {
- return mcfProperties.getSessionDefaultType();
- }
-
- /**
- * For local access
- */
- public void setJmsProviderAdapter(final JMSProviderAdapter adapter) {
- this.adapter = adapter;
- }
-
- public JMSProviderAdapter getJmsProviderAdapter() {
- return adapter;
- }
+ //---- MCF to MCF API
- private ConnectionRequestInfo getInfo(ConnectionRequestInfo info) {
- if (info == null) {
- // Create a default one
- return new JmsConnectionRequestInfo(mcfProperties);
- }
- else {
- // Fill the one with any defaults
- ((JmsConnectionRequestInfo)info).setDefaults(mcfProperties);
- return info;
- }
- }
-
- //---- MCF to MCF API
-
- protected JmsMCFProperties getProperties() {
- return mcfProperties;
- }
+ protected JmsMCFProperties getProperties()
+ {
+ return mcfProperties;
+ }
}
1.6 +3 -5 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsConnectionManager.java
Index: JmsConnectionManager.java
===================================================================
RCS file: /cvsroot/jboss/jbosscx/src/main/org/jboss/resource/adapter/jms/JmsConnectionManager.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- JmsConnectionManager.java 27 Aug 2003 04:29:38 -0000 1.5
+++ JmsConnectionManager.java 28 Feb 2004 19:07:03 -0000 1.6
@@ -26,7 +26,7 @@
* <p>Created: Thu Mar 29 16:09:26 2001
*
* @author <a href="mailto:peter.antman@tim.se">Peter Antman</a>.
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class JmsConnectionManager
implements ConnectionManager
@@ -54,16 +54,14 @@
throws ResourceException
{
boolean trace = log.isTraceEnabled();
- if (trace) {
+ if (trace)
log.trace("Allocating connection; mcf=" + mcf + ", cxRequestInfo=" + \
cxRequestInfo);
- }
ManagedConnection mc = mcf.createManagedConnection(null, cxRequestInfo);
Object c = mc.getConnection(null, cxRequestInfo);
- if (trace) {
+ if (trace)
log.trace("Allocated connection: " + c + ", with managed connection: " + \
mc);
- }
return c;
}
1.9 +2 -2 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsConnectionFactoryImpl.java
Index: JmsConnectionFactoryImpl.java
===================================================================
RCS file: /cvsroot/jboss/jbosscx/src/main/org/jboss/resource/adapter/jms/JmsConnectionFactoryImpl.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- JmsConnectionFactoryImpl.java 28 Aug 2003 19:51:03 -0000 1.8
+++ JmsConnectionFactoryImpl.java 28 Feb 2004 19:07:03 -0000 1.9
@@ -32,7 +32,7 @@
*
* Created: Thu Apr 26 17:02:50 2001
*
- * @version <tt>$Revision: 1.8 $</tt>
+ * @version <tt>$Revision: 1.9 $</tt>
* @author <a href="mailto:peter.antman@tim.se">Peter Antman</a>.
* @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
*/
@@ -138,7 +138,7 @@
Connection c = new JmsSessionFactoryImpl(mcf, cm, BOTH);
if (log.isTraceEnabled())
- log.trace("Created topic connection: " + c);
+ log.trace("Created connection: " + c);
return c;
}
1.5 +60 -59 jbosscx/src/main/org/jboss/resource/adapter/jms/JmsCred.java
Index: JmsCred.java
===================================================================
RCS file: /cvsroot/jboss/jbosscx/src/main/org/jboss/resource/adapter/jms/JmsCred.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- JmsCred.java 27 Aug 2003 04:29:38 -0000 1.4
+++ JmsCred.java 28 Feb 2004 19:07:03 -0000 1.5
@@ -21,69 +21,70 @@
import javax.resource.spi.security.PasswordCredential;
/**
- * ???
- *
+ * Credential information
+ *
* Created: Sat Mar 31 03:23:30 2001
- *
- * @author <a href="mailto:peter.antman@tim.se">Peter Antman</a>.
- * @version $Revision: 1.4 $
+ *
+ * @author <a href="mailto:peter.antman@tim.se">Peter Antman </a>.
+ * @version $Revision: 1.5 $
*/
public class JmsCred
{
- public String name;
- public String pwd;
-
- public JmsCred() {
- // empty
- }
+ public String name;
- /**
- * Get our own simple cred
- */
- public static JmsCred getJmsCred(ManagedConnectionFactory mcf,
- Subject subject,
- ConnectionRequestInfo info)
- throws SecurityException
- {
- JmsCred jc = new JmsCred();
- if (subject == null && info !=null )
- {
- // Credentials specifyed on connection request
- jc.name = ((JmsConnectionRequestInfo)info).getUserName();
- jc.pwd = ((JmsConnectionRequestInfo)info).getPassword();
- }
- else if (subject != null)
- {
- // Credentials from appserver
- Set creds =
- subject.getPrivateCredentials(PasswordCredential.class);
- PasswordCredential pwdc = null;
- Iterator credentials = creds.iterator();
- while (credentials.hasNext())
- {
- PasswordCredential curCred =
- (PasswordCredential) credentials.next();
- if (curCred.getManagedConnectionFactory().equals(mcf)) {
- pwdc = curCred;
- break;
- }
- }
-
- if (pwdc == null) {
- // No hit - we do need creds
- throw new SecurityException("No Passwdord credentials found");
- }
- jc.name = pwdc.getUserName();
- jc.pwd = new String(pwdc.getPassword());
- }
- else {
- throw new SecurityException("No Subject or ConnectionRequestInfo set, \
could not get credentials");
- }
- return jc;
- }
+ public String pwd;
- public String toString()
- {
- return super.toString() + "{ username=" + name + ", password=" + pwd + " }";
- }
+ public JmsCred()
+ {
+ // empty
+ }
+
+ /**
+ * Get our own simple cred
+ */
+ public static JmsCred getJmsCred(ManagedConnectionFactory mcf, Subject subject, \
ConnectionRequestInfo info) + throws SecurityException
+ {
+ JmsCred jc = new JmsCred();
+ if (subject == null && info != null)
+ {
+ // Credentials specifyed on connection request
+ jc.name = ((JmsConnectionRequestInfo) info).getUserName();
+ jc.pwd = ((JmsConnectionRequestInfo) info).getPassword();
+ }
+ else if (subject != null)
+ {
+ // Credentials from appserver
+ Set creds = subject.getPrivateCredentials(PasswordCredential.class);
+ PasswordCredential pwdc = null;
+ Iterator credentials = creds.iterator();
+ while (credentials.hasNext())
+ {
+ PasswordCredential curCred = (PasswordCredential) credentials.next();
+ if (curCred.getManagedConnectionFactory().equals(mcf))
+ {
+ pwdc = curCred;
+ break;
+ }
+ }
+
+ if (pwdc == null)
+ {
+ // No hit - we do need creds
+ throw new SecurityException("No Passwdord credentials found");
+ }
+ jc.name = pwdc.getUserName();
+ jc.pwd = new String(pwdc.getPassword());
+ }
+ else
+ {
+ throw new SecurityException("No Subject or ConnectionRequestInfo set, could not \
get credentials"); + }
+ return jc;
+ }
+
+ public String toString()
+ {
+ return super.toString() + "{ username=" + name + ", password=" + pwd + " }";
+ }
}
1.6 +34 -6 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsConnectionRequestInfo.java
Index: JmsConnectionRequestInfo.java
===================================================================
RCS file: /cvsroot/jboss/jbosscx/src/main/org/jboss/resource/adapter/jms/JmsConnectionRequestInfo.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- JmsConnectionRequestInfo.java 28 Aug 2003 19:51:04 -0000 1.5
+++ JmsConnectionRequestInfo.java 28 Feb 2004 19:07:03 -0000 1.6
@@ -16,18 +16,19 @@
import org.jboss.util.Strings;
/**
- * ???
+ * Request information used in pooling
*
* Created: Thu Mar 29 16:29:55 2001
*
* @author <a href="mailto:peter.antman@tim.se">Peter Antman</a>.
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class JmsConnectionRequestInfo
implements ConnectionRequestInfo
{
private String userName;
private String password;
+ private String clientID;
private boolean transacted = true;
private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
@@ -86,6 +87,16 @@
this.password = password;
}
+ public String getClientID()
+ {
+ return clientID;
+ }
+
+ public void setClientID(String clientID)
+ {
+ this.clientID = clientID;
+ }
+
public boolean isTransacted()
{
return transacted;
@@ -111,17 +122,34 @@
this.acknowledgeMode == you.getAcknowledgeMode() &&
this.type == you.getType() &&
Strings.compare(userName, you.getUserName()) &&
- Strings.compare(password, you.getPassword()));
+ Strings.compare(password, you.getPassword()) &&
+ Strings.compare(clientID, you.getClientID()));
}
else
return false;
}
- // FIXME !!
public int hashCode()
{
- String result = "" + userName + password + transacted + acknowledgeMode + \
type;
- return result.hashCode();
+ int hashCode = 0;
+ if (transacted)
+ hashCode += 1;
+ if (type == JmsConnectionFactory.QUEUE)
+ hashCode += 3;
+ else if (type == JmsConnectionFactory.TOPIC)
+ hashCode += 5;
+ if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE)
+ hashCode += 7;
+ else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
+ hashCode += 11;
+ if (userName != null)
+ hashCode += userName.hashCode();
+ if (password != null)
+ hashCode += password.hashCode();
+ if (clientID != null)
+ hashCode += clientID.hashCode();
+
+ return hashCode;
}
/**
1.7 +394 -268 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsSession.java
Index: JmsSession.java
===================================================================
RCS file: /cvsroot/jboss/jbosscx/src/main/org/jboss/resource/adapter/jms/JmsSession.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- JmsSession.java 28 Aug 2003 19:51:04 -0000 1.6
+++ JmsSession.java 28 Feb 2004 19:07:03 -0000 1.7
@@ -10,9 +10,12 @@
package org.jboss.resource.adapter.jms;
import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
import javax.jms.BytesMessage;
import javax.jms.Destination;
+import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.MapMessage;
@@ -43,299 +46,422 @@
/**
* Adapts the JMS QueueSession and TopicSession API to a JmsManagedConnection.
- *
+ *
* <p>Created: Tue Apr 17 22:39:45 2001
- *
- * @author <a href="mailto:peter.antman@tim.se">Peter Antman</a>.
- * @author <a href="mailto:jason@planet57.com">Jason Dillon</a>.
- * @version $Revision: 1.6 $
+ *
+ * @author <a href="mailto:peter.antman@tim.se">Peter Antman </a>.
+ * @author <a href="mailto:jason@planet57.com">Jason Dillon </a>.
+ * @version $Revision: 1.7 $
*/
-public class JmsSession
- implements Session, QueueSession, TopicSession
+public class JmsSession implements Session, QueueSession, TopicSession
{
- private static final Logger log = Logger.getLogger(JmsSession.class);
-
- /** The managed connection for this session. */
- private JmsManagedConnection mc; // = null;
-
- /**
- * Construct a <tt>JmsSession</tt>.
- *
- * @param mc The managed connection for this session.
- */
- public JmsSession(final JmsManagedConnection mc) {
- this.mc = mc;
- }
+ private static final Logger log = Logger.getLogger(JmsSession.class);
- /**
- * Ensure that the session is opened.
- *
- * @return The session
- *
- * @throws IllegalStateException The session is closed
- */
- private Session getSession() throws JMSException {
- // ensure that the connection is opened
- if (mc == null)
- throw new IllegalStateException("The session is closed");
+ /** The managed connection for this session. */
+ private JmsManagedConnection mc; // = null;
- return mc.getSession();
- }
+ /** The session factory for this session */
+ private JmsSessionFactory sf;
- // ---- Session API
-
- public BytesMessage createBytesMessage() throws JMSException
- {
- return getSession().createBytesMessage();
- }
-
- public MapMessage createMapMessage() throws JMSException
- {
- return getSession().createMapMessage();
- }
-
- public Message createMessage() throws JMSException
- {
- return getSession().createMessage();
- }
-
- public ObjectMessage createObjectMessage() throws JMSException
- {
- return getSession().createObjectMessage();
- }
-
- public ObjectMessage createObjectMessage(Serializable object)
- throws JMSException
- {
- return getSession().createObjectMessage(object);
- }
-
- public StreamMessage createStreamMessage() throws JMSException
- {
- return getSession().createStreamMessage();
- }
-
- public TextMessage createTextMessage() throws JMSException
- {
- return getSession().createTextMessage();
- }
+ /** The message consumers */
+ private HashSet consumers = new HashSet();
+
+ /** The message producers */
+ private HashSet producers = new HashSet();
+
+ /**
+ * Construct a <tt>JmsSession</tt>.
+ *
+ * @param mc The managed connection for this session.
+ */
+ public JmsSession(final JmsManagedConnection mc)
+ {
+ this.mc = mc;
+ }
- public TextMessage createTextMessage(String string) throws JMSException
+ public void setJmsSessionFactory(JmsSessionFactory sf)
{
- return getSession().createTextMessage(string);
+ this.sf = sf;
}
- public boolean getTransacted() throws JMSException
+ /**
+ * Ensure that the session is opened.
+ *
+ * @return The session
+ *
+ * @throws IllegalStateException The session is closed
+ */
+ Session getSession() throws JMSException
+ {
+ // ensure that the connection is opened
+ if (mc == null)
+ throw new IllegalStateException("The session is closed");
+
+ return mc.getSession();
+ }
+
+ // ---- Session API
+
+ public BytesMessage createBytesMessage() throws JMSException
+ {
+ return getSession().createBytesMessage();
+ }
+
+ public MapMessage createMapMessage() throws JMSException
+ {
+ return getSession().createMapMessage();
+ }
+
+ public Message createMessage() throws JMSException
+ {
+ return getSession().createMessage();
+ }
+
+ public ObjectMessage createObjectMessage() throws JMSException
+ {
+ return getSession().createObjectMessage();
+ }
+
+ public ObjectMessage createObjectMessage(Serializable object) throws JMSException
+ {
+ return getSession().createObjectMessage(object);
+ }
+
+ public StreamMessage createStreamMessage() throws JMSException
+ {
+ return getSession().createStreamMessage();
+ }
+
+ public TextMessage createTextMessage() throws JMSException
+ {
+ return getSession().createTextMessage();
+ }
+
+ public TextMessage createTextMessage(String string) throws JMSException
+ {
+ return getSession().createTextMessage(string);
+ }
+
+ public boolean getTransacted() throws JMSException
+ {
+ return getSession().getTransacted();
+ }
+
+ /**
+ * Always throws an Exception.
+ *
+ * @throws IllegalStateException Method not allowed.
+ */
+ public MessageListener getMessageListener() throws JMSException
+ {
+ throw new IllegalStateException("Method not allowed");
+ }
+
+ /**
+ * Always throws an Exception.
+ *
+ * @throws IllegalStateException Method not allowed.
+ */
+ public void setMessageListener(MessageListener listener) throws JMSException
+ {
+ throw new IllegalStateException("Method not allowed");
+ }
+
+ /**
+ * Always throws an Error.
+ *
+ * @throws Error Method not allowed.
+ */
+ public void run()
+ {
+ // should this really throw an Error?
+ throw new Error("Method not allowed");
+ }
+
+ /**
+ * Closes the session. Sends a ConnectionEvent.CONNECTION_CLOSED to the
+ * managed connection.
+ *
+ * @throws JMSException Failed to close session.
+ */
+ public void close() throws JMSException
+ {
+ closeSession();
+ }
+
+ // FIXME - is this really OK, probably not
+ public void commit() throws JMSException
+ {
+ getSession().commit();
+ }
+
+ public void rollback() throws JMSException
+ {
+ getSession().rollback();
+ }
+
+ public void recover() throws JMSException
+ {
+ getSession().recover();
+ }
+
+ // --- TopicSession API
+
+ public Topic createTopic(String topicName) throws JMSException
+ {
+ return getSession().createTopic(topicName);
+ }
+
+ public TopicSubscriber createSubscriber(Topic topic) throws JMSException
+ {
+ TopicSubscriber result = ((TopicSession) \
getSession()).createSubscriber(topic); + result = new \
JmsTopicSubscriber(result, this); + addConsumer(result);
+ return result;
+ }
+
+ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, \
boolean noLocal) throws JMSException + {
+ TopicSubscriber result = ((TopicSession) getSession()).createSubscriber(topic, \
messageSelector, noLocal); + result = new JmsTopicSubscriber(result, this);
+ addConsumer(result);
+ return result;
+ }
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws \
JMSException + {
+ TopicSubscriber result = getSession().createDurableSubscriber(topic, name);
+ result = new JmsTopicSubscriber(result, this);
+ addConsumer(result);
+ return result;
+ }
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String \
messageSelector, boolean noLocal) + throws JMSException
+ {
+ TopicSubscriber result = getSession().createDurableSubscriber(topic, name, \
messageSelector, noLocal); + result = new JmsTopicSubscriber(result, this);
+ addConsumer(result);
+ return result;
+ }
+
+ public TopicPublisher createPublisher(Topic topic) throws JMSException
+ {
+ TopicPublisher result = ((TopicSession) getSession()).createPublisher(topic);
+ addProducer(result);
+ return result;
+ }
+
+ public TemporaryTopic createTemporaryTopic() throws JMSException
+ {
+ TemporaryTopic temp = getSession().createTemporaryTopic();
+ sf.addTemporaryTopic(temp);
+ return temp;
+ }
+
+ public void unsubscribe(String name) throws JMSException
+ {
+ getSession().unsubscribe(name);
+ }
+
+ //--- QueueSession API
+
+ public QueueBrowser createBrowser(Queue queue) throws JMSException
+ {
+ return getSession().createBrowser(queue);
+ }
+
+ public QueueBrowser createBrowser(Queue queue, String messageSelector) throws \
JMSException + {
+ return getSession().createBrowser(queue, messageSelector);
+ }
+
+ public Queue createQueue(String queueName) throws JMSException
+ {
+ return getSession().createQueue(queueName);
+ }
+
+ public QueueReceiver createReceiver(Queue queue) throws JMSException
+ {
+ QueueReceiver result = ((QueueSession) getSession()).createReceiver(queue);
+ result = new JmsQueueReceiver(result, this);
+ addConsumer(result);
+ return result;
+ }
+
+ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws \
JMSException + {
+ QueueReceiver result = ((QueueSession) getSession()).createReceiver(queue, \
messageSelector); + result = new JmsQueueReceiver(result, this);
+ addConsumer(result);
+ return result;
+ }
+
+ public QueueSender createSender(Queue queue) throws JMSException
+ {
+ QueueSender result = ((QueueSession) getSession()).createSender(queue);
+ addProducer(result);
+ return result;
+ }
+
+ public TemporaryQueue createTemporaryQueue() throws JMSException
+ {
+ TemporaryQueue temp = getSession().createTemporaryQueue();
+ sf.addTemporaryQueue(temp);
+ return temp;
+ }
+
+ // -- JMS 1.1
+
+ public MessageConsumer createConsumer(Destination destination) throws \
JMSException + {
+ MessageConsumer result = getSession().createConsumer(destination);
+ result = new JmsMessageConsumer(result, this);
+ addConsumer(result);
+ return result;
+ }
+
+ public MessageConsumer createConsumer(Destination destination, String \
messageSelector) throws JMSException + {
+ MessageConsumer result = getSession().createConsumer(destination, \
messageSelector); + result = new JmsMessageConsumer(result, this);
+ addConsumer(result);
+ return result;
+ }
+
+ public MessageConsumer createConsumer(Destination destination, String \
messageSelector, boolean noLocal) + throws JMSException
+ {
+ MessageConsumer result = getSession().createConsumer(destination, \
messageSelector, noLocal); + result = new JmsMessageConsumer(result, this);
+ addConsumer(result);
+ return result;
+ }
+
+ public MessageProducer createProducer(Destination destination) throws \
JMSException + {
+ MessageProducer result = getSession().createProducer(destination);
+ addProducer(result);
+ return result;
+ }
+
+ public int getAcknowledgeMode() throws JMSException
+ {
+ return getSession().getAcknowledgeMode();
+ }
+
+ // --- JmsManagedConnection api
+
+ void setManagedConnection(final JmsManagedConnection mc)
+ {
+ if (this.mc != null)
+ this.mc.removeHandle(this);
+ this.mc = mc;
+ }
+
+ void destroy()
+ {
+ mc = null;
+ }
+
+ void start() throws JMSException
{
- return getSession().getTransacted();
+ if (mc != null)
+ mc.start();
}
- /**
- * Always throws an Exception.
- *
- * @throws IllegalStateException Method not allowed.
- */
- public MessageListener getMessageListener() throws JMSException
+ void stop() throws JMSException
{
- throw new IllegalStateException("Method not allowed");
+ if (mc != null)
+ mc.stop();
}
-
- /**
- * Always throws an Exception.
- *
- * @throws IllegalStateException Method not allowed.
- */
- public void setMessageListener(MessageListener listener)
- throws JMSException
- {
- throw new IllegalStateException("Method not allowed");
- }
-
- /**
- * Always throws an Error.
- *
- * @throws Error Method not allowed.
- */
- public void run() {
- // should this really throw an Error?
- throw new Error("Method not allowed");
- }
-
- /**
- * Closes the session. Sends a ConnectionEvent.CONNECTION_CLOSED to the
- * managed connection.
- *
- * @throws JMSException Failed to close session.
- */
- public void close() throws JMSException
+
+ void closeSession() throws JMSException
+ {
+ if (mc != null)
+ {
+ log.trace("Closing session");
+
+ try
+ {
+ mc.stop();
+ }
+ catch (Throwable t)
+ {
+ log.trace("Error stopping managed connection", t);
+ }
+
+ synchronized (consumers)
+ {
+ for (Iterator i = consumers.iterator(); i.hasNext();)
+ {
+ JmsMessageConsumer consumer = (JmsMessageConsumer) i.next();
+ try
+ {
+ consumer.closeConsumer();
+ }
+ catch (Throwable t)
+ {
+ log.trace("Error closing consumer", t);
+ }
+ i.remove();
+ }
+ }
+
+ synchronized (producers)
+ {
+ for (Iterator i = producers.iterator(); i.hasNext();)
+ {
+ MessageProducer producer = (MessageProducer) i.next();
+ try
+ {
+ producer.close();
+ }
+ catch (Throwable t)
+ {
+ log.trace("Error closing producer", t);
+ }
+ i.remove();
+ }
+ }
+
+ mc.removeHandle(this);
+ ConnectionEvent ev = new ConnectionEvent(mc, \
ConnectionEvent.CONNECTION_CLOSED); + ev.setConnectionHandle(this);
+ mc.sendEvent(ev);
+ mc = null;
+ }
+ }
+
+ void addConsumer(MessageConsumer consumer)
{
- if (mc != null) {
- log.debug("Closing session");
-
- // Special stuff FIXME
- mc.removeHandle(this);
- ConnectionEvent ev =
- new ConnectionEvent(mc, ConnectionEvent.CONNECTION_CLOSED);
- ev.setConnectionHandle(this);
- mc.sendEvent(ev);
- mc = null;
+ synchronized (consumers)
+ {
+ consumers.add(consumer);
}
}
-
- // FIXME - is this really OK, probably not
- public void commit() throws JMSException
- {
- getSession().commit();
- }
-
- public void rollback() throws JMSException
- {
- getSession().rollback();
- }
-
- public void recover() throws JMSException
- {
- getSession().recover();
- }
-
- // --- TopicSession API
- public Topic createTopic(String topicName) throws JMSException
- {
- return ((TopicSession)getSession()).createTopic(topicName);
- }
-
- public TopicSubscriber createSubscriber(Topic topic) throws JMSException
- {
- return ((TopicSession)getSession()).createSubscriber(topic);
- }
-
- public TopicSubscriber createSubscriber(Topic topic,
- String messageSelector,
- boolean noLocal)
- throws JMSException
- {
- return ((TopicSession)getSession()).
- createSubscriber(topic,messageSelector, noLocal);
- }
-
- public TopicSubscriber createDurableSubscriber(Topic topic,
- String name)
- throws JMSException
+ void removeConsumer(MessageConsumer consumer)
{
- return ((TopicSession)getSession()).
- createDurableSubscriber(topic, name);
- }
-
- public TopicSubscriber createDurableSubscriber(Topic topic,
- String name,
- String messageSelector,
- boolean noLocal)
- throws JMSException
- {
- return ((TopicSession)getSession()).
- createDurableSubscriber(topic, name, messageSelector, noLocal);
- }
-
- public TopicPublisher createPublisher(Topic topic) throws JMSException
- {
- return ((TopicSession)getSession()).createPublisher(topic);
- }
-
- public TemporaryTopic createTemporaryTopic() throws JMSException
- {
- return ((TopicSession)getSession()).createTemporaryTopic();
- }
-
- public void unsubscribe(String name) throws JMSException
- {
- ((TopicSession)getSession()).unsubscribe(name);
+ synchronized (consumers)
+ {
+ consumers.remove(consumer);
+ }
}
-
- //--- QueueSession API
- public QueueBrowser createBrowser(Queue queue) throws JMSException
+ void addProducer(MessageProducer producer)
{
- return ((QueueSession)getSession()).createBrowser(queue);
- }
-
- public QueueBrowser createBrowser(Queue queue,
- String messageSelector)
- throws JMSException
- {
- return ((QueueSession)getSession()).
- createBrowser(queue,messageSelector);
- }
-
- public Queue createQueue(String queueName) throws JMSException
- {
- return ((QueueSession)getSession()).createQueue(queueName);
- }
-
- public QueueReceiver createReceiver(Queue queue) throws JMSException
- {
- return ((QueueSession)getSession()).createReceiver(queue);
- }
-
- public QueueReceiver createReceiver(Queue queue, String messageSelector)
- throws JMSException
- {
- return ((QueueSession)getSession()).
- createReceiver(queue, messageSelector);
- }
-
- public QueueSender createSender(Queue queue) throws JMSException
- {
- return ((QueueSession)getSession()).createSender(queue);
- }
-
- public TemporaryQueue createTemporaryQueue() throws JMSException
- {
- return ((QueueSession)getSession()).createTemporaryQueue();
- }
-
- // -- JMS 1.1
-
- public MessageConsumer createConsumer(Destination destination)
- throws JMSException
- {
- return getSession().createConsumer(destination);
- }
-
- public MessageConsumer createConsumer(Destination destination, String \
messageSelector)
- throws JMSException
- {
- return getSession().createConsumer(destination, messageSelector);
- }
-
- public MessageConsumer createConsumer(Destination destination, String \
messageSelector, boolean noLocal)
- throws JMSException
- {
- return getSession().createConsumer(destination, messageSelector, noLocal);
- }
-
- public MessageProducer createProducer(Destination destination)
- throws JMSException
- {
- return getSession().createProducer(destination);
- }
-
- public int getAcknowledgeMode()
- throws JMSException
- {
- return getSession().getAcknowledgeMode();
+ synchronized (producers)
+ {
+ producers.add(producer);
+ }
}
-
- // --- JmsManagedConnection api
- void setManagedConnection(final JmsManagedConnection mc) {
- if (this.mc != null) {
- this.mc.removeHandle(this);
+ void removeProducer(MessageProducer producer)
+ {
+ synchronized (producers)
+ {
+ producers.remove(producer);
}
- this.mc = mc;
- }
-
- void destroy() {
- mc = null;
}
}
1.11 +22 -5 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java
Index: JmsManagedConnection.java
===================================================================
RCS file: /cvsroot/jboss/jbosscx/src/main/org/jboss/resource/adapter/jms/JmsManagedConnection.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- JmsManagedConnection.java 15 Jan 2004 00:59:45 -0000 1.10
+++ JmsManagedConnection.java 28 Feb 2004 19:07:03 -0000 1.11
@@ -116,7 +116,7 @@
*
* @author <a href="mailto:peter.antman@tim.se">Peter Antman</a>.
* @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
- * @version $Revision: 1.10 $
+ * @version $Revision: 1.11 $
*/
public class JmsManagedConnection
implements ManagedConnection, ExceptionListener
@@ -226,8 +226,17 @@
*/
private void destroyHandles() throws ResourceException
{
+ try
+ {
+ if (con != null)
+ con.stop();
+ }
+ catch (Throwable t)
+ {
+ log.trace("Ignored error stopping connection", t);
+ }
+
Iterator iter = handles.iterator();
-
while (iter.hasNext())
((JmsSession)iter.next()).destroy();
@@ -307,7 +316,7 @@
public void cleanup() throws ResourceException
{
if (isDestroyed)
- throw new IllegalStateException("ManagedConnection already destroyd");
+ throw new IllegalStateException("ManagedConnection already destroyed");
// destory handles
destroyHandles();
@@ -566,6 +575,16 @@
return mcf;
}
+ void start() throws JMSException
+ {
+ con.start();
+ }
+
+ void stop() throws JMSException
+ {
+ con.stop();
+ }
+
// --- Used by MetaData
/**
@@ -710,8 +729,6 @@
log.debug("xaSession=" + xaQueueSession + ", Session=" + session);
}
- con.start();
-
if (debug)
log.debug("transacted=" + transacted + ", ack=" + ack);
}
1.4 +7 -10 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java
Index: JmsLocalTransaction.java
===================================================================
RCS file: /cvsroot/jboss/jbosscx/src/main/org/jboss/resource/adapter/jms/JmsLocalTransaction.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- JmsLocalTransaction.java 27 Aug 2003 04:29:38 -0000 1.3
+++ JmsLocalTransaction.java 28 Feb 2004 19:07:03 -0000 1.4
@@ -22,7 +22,7 @@
* Created: Tue Apr 17 23:44:05 2001
*
* @author <a href="mailto:peter.antman@tim.se">Peter Antman</a>.
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class JmsLocalTransaction
implements LocalTransaction
@@ -36,15 +36,15 @@
public void begin() throws ResourceException {
// NOOP - begin is automatic in JMS
// Should probably send event
- ConnectionEvent ev = new ConnectionEvent(mc, \
ConnectionEvent.LOCAL_TRANSACTION_STARTED);
- mc.sendEvent(ev);
+ //ConnectionEvent ev = new ConnectionEvent(mc, \
ConnectionEvent.LOCAL_TRANSACTION_STARTED); + //mc.sendEvent(ev);
}
public void commit() throws ResourceException {
try {
mc.getSession().commit();
- ConnectionEvent ev = new ConnectionEvent(mc, \
ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
- mc.sendEvent(ev);
+ //ConnectionEvent ev = new ConnectionEvent(mc, \
ConnectionEvent.LOCAL_TRANSACTION_COMMITTED); + //mc.sendEvent(ev);
}
catch (JMSException ex) {
ResourceException re =
@@ -57,8 +57,8 @@
public void rollback() throws ResourceException {
try {
mc.getSession().rollback();
- ConnectionEvent ev = new ConnectionEvent(mc, \
ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
- mc.sendEvent(ev);
+ //ConnectionEvent ev = new ConnectionEvent(mc, \
ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK); + //mc.sendEvent(ev);
}
catch (JMSException ex) {
ResourceException re =
@@ -68,6 +68,3 @@
}
}
}
-
-
-
1.5 +24 -2 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsSessionFactory.java
Index: JmsSessionFactory.java
===================================================================
RCS file: /cvsroot/jboss/jbosscx/src/main/org/jboss/resource/adapter/jms/JmsSessionFactory.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- JmsSessionFactory.java 28 Aug 2003 19:51:04 -0000 1.4
+++ JmsSessionFactory.java 28 Feb 2004 19:07:03 -0000 1.5
@@ -10,7 +10,10 @@
package org.jboss.resource.adapter.jms;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.QueueConnection;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
import javax.jms.TopicConnection;
/**
@@ -19,10 +22,29 @@
* <p>Created: Thu Mar 29 15:37:21 2001
*
* @author <a href="mailto:peter.antman@tim.se">Peter Antman</a>.
- * @version <pre>$Revision: 1.4 $</pre>
+ * @version <pre>$Revision: 1.5 $</pre>
*/
public interface JmsSessionFactory
extends Connection, TopicConnection, QueueConnection
{
- // empty
+ /**
+ * Add a temporary queue
+ *
+ * @param temp the temporary queue
+ */
+ void addTemporaryQueue(TemporaryQueue temp);
+
+ /**
+ * Add a temporary topic
+ *
+ * @param temp the temporary topic
+ */
+ void addTemporaryTopic(TemporaryTopic temp);
+
+ /**
+ * Notification that a session is closed
+ *
+ * @throws JMSException for any error
+ */
+ void closeSession(JmsSession session) throws JMSException;
}
1.9 +175 -66 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsSessionFactoryImpl.java
Index: JmsSessionFactoryImpl.java
===================================================================
RCS file: /cvsroot/jboss/jbosscx/src/main/org/jboss/resource/adapter/jms/JmsSessionFactoryImpl.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- JmsSessionFactoryImpl.java 28 Aug 2003 19:51:04 -0000 1.8
+++ JmsSessionFactoryImpl.java 28 Feb 2004 19:07:03 -0000 1.9
@@ -9,6 +9,9 @@
package org.jboss.resource.adapter.jms;
+import java.util.HashSet;
+import java.util.Iterator;
+
import javax.naming.Reference;
import javax.resource.Referenceable;
@@ -21,11 +24,14 @@
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.jms.TopicSession;
@@ -36,7 +42,7 @@
*
* <p>Created: Thu Mar 29 15:36:51 2001
*
- * @version <tt>$Revision: 1.8 $</tt>
+ * @version <tt>$Revision: 1.9 $</tt>
* @author <a href="mailto:peter.antman@tim.se">Peter Antman</a>.
* @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
*/
@@ -48,24 +54,40 @@
private static final String ISE =
"This method is not applicatable in JMS resource adapter";
+ /** Are we closed? */
+ private boolean closed = false;
+
private Reference reference;
// Used from JmsConnectionFactory
private String userName;
private String password;
+ private String clientID;
private int type;
-
+
+ /* Whether we are started */
+ private boolean started = false;
+
/** JmsRa own factory */
- private ManagedConnectionFactory mcf;
+ private JmsManagedConnectionFactory mcf;
/** Hook to the appserver */
private ConnectionManager cm;
-
+
+ /** The sessions */
+ private HashSet sessions = new HashSet();
+
+ /** The temporary queues */
+ private HashSet tempQueues = new HashSet();
+
+ /** The temporary topics */
+ private HashSet tempTopics = new HashSet();
+
public JmsSessionFactoryImpl(final ManagedConnectionFactory mcf,
final ConnectionManager cm,
final int type)
{
- this.mcf = mcf;
+ this.mcf = (JmsManagedConnectionFactory) mcf;
this.cm = cm;
if (cm == null) {
@@ -78,7 +100,7 @@
this.type = type;
- log.debug("mcf=" + mcf + ", cm=" + cm + ", type=" + type);
+ log.trace("mcf=" + mcf + ", cm=" + cm + ", type=" + type);
}
public void setReference(final Reference reference)
@@ -108,29 +130,11 @@
public QueueSession createQueueSession(final boolean transacted,
final int acknowledgeMode)
throws JMSException
- {
- try
- {
- if (type == JmsConnectionFactory.TOPIC)
- throw new IllegalStateException
- ("Can not get a queue session from a topic connection");
-
- JmsConnectionRequestInfo info =
- new JmsConnectionRequestInfo(transacted, acknowledgeMode, type);
- info.setUserName(userName);
- info.setPassword(password);
-
- return (QueueSession)cm.allocateConnection(mcf, info);
- }
- catch (ResourceException e)
- {
- log.error("could not create session", e);
-
- JMSException je =
- new JMSException("Could not create a session: " + e);
- je.setLinkedException(e);
- throw je;
- }
+ {
+ checkClosed();
+ if (type == JmsConnectionFactory.TOPIC)
+ throw new IllegalStateException("Can not get a queue session from a topic \
connection"); + return allocateConnection(transacted, acknowledgeMode, type);
}
public ConnectionConsumer createConnectionConsumer
@@ -149,29 +153,10 @@
final int acknowledgeMode)
throws JMSException
{
- try
- {
- if (type == JmsConnectionFactory.QUEUE)
- throw new IllegalStateException
- ("Can not get a topic session from a session connection");
-
- JmsConnectionRequestInfo info =
- new JmsConnectionRequestInfo(transacted, acknowledgeMode, type);
-
- info.setUserName(userName);
- info.setPassword(password);
-
- return (TopicSession)cm.allocateConnection(mcf, info);
- }
- catch (ResourceException e)
- {
- log.error("could not create session", e);
-
- JMSException je = new JMSException
- ("Could not create a session: " + e);
- je.setLinkedException(e);
- throw je;
- }
+ checkClosed();
+ if (type == JmsConnectionFactory.QUEUE)
+ throw new IllegalStateException("Can not get a topic session from a queue \
connection"); + return allocateConnection(transacted, acknowledgeMode, type);
}
public ConnectionConsumer createConnectionConsumer
@@ -199,17 +184,22 @@
public String getClientID() throws JMSException
{
- throw new IllegalStateException(ISE);
+ checkClosed();
+ return clientID;
}
public void setClientID(String cID) throws JMSException
{
- throw new IllegalStateException(ISE);
+ checkClosed();
+ if (clientID != null)
+ throw new IllegalStateException("Cannot change client id");
+ clientID = cID;
}
public ConnectionMetaData getMetaData() throws JMSException
{
- throw new IllegalStateException(ISE);
+ checkClosed();
+ return mcf.getMetaData();
}
public ExceptionListener getExceptionListener() throws JMSException
@@ -225,21 +215,118 @@
public void start() throws JMSException
{
- throw new IllegalStateException(ISE);
+ checkClosed();
+ if (started)
+ return;
+ started = true;
+ synchronized (sessions)
+ {
+ for (Iterator i = sessions.iterator(); i.hasNext();)
+ {
+ JmsSession session = (JmsSession) i.next();
+ session.start();
+ }
+ }
}
public void stop() throws JMSException
{
- throw new IllegalStateException(ISE);
+ checkClosed();
+ if (started == false)
+ return;
+ started = true;
+ synchronized (sessions)
+ {
+ for (Iterator i = sessions.iterator(); i.hasNext();)
+ {
+ JmsSession session = (JmsSession) i.next();
+ session.stop();
+ }
+ }
}
public void close() throws JMSException
{
- //
- // TODO: close all sessions, for now just do nothing.
- //
+ if (closed)
+ return;
+ closed = true;
+
+ synchronized (sessions)
+ {
+ for (Iterator i = sessions.iterator(); i.hasNext();)
+ {
+ JmsSession session = (JmsSession) i.next();
+ try
+ {
+ session.closeSession();
+ }
+ catch (Throwable t)
+ {
+ log.trace("Error closing session", t);
+ }
+ i.remove();
+ }
+ }
+
+ synchronized (tempQueues)
+ {
+ for (Iterator i = tempQueues.iterator(); i.hasNext();)
+ {
+ TemporaryQueue temp = (TemporaryQueue) i.next();
+ try
+ {
+ temp.delete();
+ }
+ catch (Throwable t)
+ {
+ log.trace("Error deleting temporary queue", t);
+ }
+ i.remove();
+ }
+ }
+
+ synchronized (tempTopics)
+ {
+ for (Iterator i = tempTopics.iterator(); i.hasNext();)
+ {
+ TemporaryTopic temp = (TemporaryTopic) i.next();
+ try
+ {
+ temp.delete();
+ }
+ catch (Throwable t)
+ {
+ log.trace("Error deleting temporary queue", t);
+ }
+ i.remove();
+ }
+ }
}
+ public void closeSession(JmsSession session) throws JMSException
+ {
+ synchronized (sessions)
+ {
+ sessions.remove(session);
+ }
+ }
+
+ public void addTemporaryQueue(TemporaryQueue temp)
+ {
+ synchronized(tempQueues)
+ {
+ tempQueues.add(temp);
+ }
+ }
+
+ public void addTemporaryTopic(TemporaryTopic temp)
+ {
+ synchronized(tempTopics)
+ {
+ tempTopics.add(temp);
+ }
+ }
+
// -- JMS 1.1
public ConnectionConsumer createConnectionConsumer(Destination destination, \
ServerSessionPool pool, int maxMessages) throws JMSException @@ -255,15 +342,30 @@
public Session createSession(boolean transacted, int acknowledgeMode)
throws JMSException
{
+ checkClosed();
+ return allocateConnection(transacted, acknowledgeMode, type);
+ }
+
+ protected JmsSession allocateConnection(boolean transacted, int \
acknowledgeMode, int sessionType) throws JMSException + {
try
{
- JmsConnectionRequestInfo info =
- new JmsConnectionRequestInfo(transacted, acknowledgeMode, type);
-
+ if (transacted)
+ acknowledgeMode = Session.SESSION_TRANSACTED;
+ JmsConnectionRequestInfo info = new JmsConnectionRequestInfo(transacted, \
acknowledgeMode, sessionType); info.setUserName(userName);
info.setPassword(password);
-
- return (Session)cm.allocateConnection(mcf, info);
+ info.setClientID(clientID);
+
+ JmsSession session = (JmsSession) cm.allocateConnection(mcf, info);
+ session.setJmsSessionFactory(this);
+ if (started)
+ session.start();
+ synchronized (sessions)
+ {
+ sessions.add(session);
+ }
+ return session;
}
catch (ResourceException e)
{
@@ -273,6 +375,13 @@
("Could not create a session: " + e);
je.setLinkedException(e);
throw je;
- }
+ }
+ }
+
+ protected void checkClosed() throws IllegalStateException
+ {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
}
+
}
1.1 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsMapMessage.java
Index: JmsMapMessage.java
===================================================================
/***************************************
* *
* JBoss: The OpenSource J2EE WebOS *
* *
* Distributable under LGPL license. *
* See terms of license at gnu.org. *
* *
***************************************/
package org.jboss.resource.adapter.jms;
import java.util.Enumeration;
import javax.jms.JMSException;
import javax.jms.MapMessage;
/**
* A wrapper for a message
*
* @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
* @version $Revision: 1.1 $
*/
public class JmsMapMessage extends JmsMessage implements MapMessage
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
/**
* Create a new wrapper
*
* @param message the message
* @param session the session
*/
public JmsMapMessage(MapMessage message, JmsSession session)
{
super(message, session);
}
// Public --------------------------------------------------------
// MapMessage implementation -------------------------------------
public boolean getBoolean(String name) throws JMSException
{
return ((MapMessage) message).getBoolean(name);
}
public byte getByte(String name) throws JMSException
{
return ((MapMessage) message).getByte(name);
}
public byte[] getBytes(String name) throws JMSException
{
return ((MapMessage) message).getBytes(name);
}
public char getChar(String name) throws JMSException
{
return ((MapMessage) message).getChar(name);
}
public double getDouble(String name) throws JMSException
{
return ((MapMessage) message).getDouble(name);
}
public float getFloat(String name) throws JMSException
{
return ((MapMessage) message).getFloat(name);
}
public int getInt(String name) throws JMSException
{
return ((MapMessage) message).getInt(name);
}
public long getLong(String name) throws JMSException
{
return ((MapMessage) message).getLong(name);
}
public Enumeration getMapNames() throws JMSException
{
return ((MapMessage) message).getMapNames();
}
public Object getObject(String name) throws JMSException
{
return ((MapMessage) message).getObject(name);
}
public short getShort(String name) throws JMSException
{
return ((MapMessage) message).getShort(name);
}
public String getString(String name) throws JMSException
{
return ((MapMessage) message).getString(name);
}
public boolean itemExists(String name) throws JMSException
{
return ((MapMessage) message).itemExists(name);
}
public void setBoolean(String name, boolean value) throws JMSException
{
((MapMessage) message).setBoolean(name, value);
}
public void setByte(String name, byte value) throws JMSException
{
((MapMessage) message).setByte(name, value);
}
public void setBytes(String name, byte[] value, int offset, int length) throws \
JMSException {
((MapMessage) message).setBytes(name, value, offset, length);
}
public void setBytes(String name, byte[] value) throws JMSException
{
((MapMessage) message).setBytes(name, value);
}
public void setChar(String name, char value) throws JMSException
{
((MapMessage) message).setChar(name, value);
}
public void setDouble(String name, double value) throws JMSException
{
((MapMessage) message).setDouble(name, value);
}
public void setFloat(String name, float value) throws JMSException
{
((MapMessage) message).setFloat(name, value);
}
public void setInt(String name, int value) throws JMSException
{
((MapMessage) message).setInt(name, value);
}
public void setLong(String name, long value) throws JMSException
{
((MapMessage) message).setLong(name, value);
}
public void setObject(String name, Object value) throws JMSException
{
((MapMessage) message).setObject(name, value);
}
public void setShort(String name, short value) throws JMSException
{
((MapMessage) message).setShort(name, value);
}
public void setString(String name, String value) throws JMSException
{
((MapMessage) message).setString(name, value);
}
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
1.1 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsBytesMessage.java
Index: JmsBytesMessage.java
===================================================================
/***************************************
* *
* JBoss: The OpenSource J2EE WebOS *
* *
* Distributable under LGPL license. *
* See terms of license at gnu.org. *
* *
***************************************/
package org.jboss.resource.adapter.jms;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
/**
* A wrapper for a message
*
* @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
* @version $Revision: 1.1 $
*/
public class JmsBytesMessage extends JmsMessage implements BytesMessage
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
/**
* Create a new wrapper
*
* @param message the message
* @param session the session
*/
public JmsBytesMessage(BytesMessage message, JmsSession session)
{
super(message, session);
}
// Public --------------------------------------------------------
// BytesMessage implementation -----------------------------------
public long getBodyLength() throws JMSException
{
return ((BytesMessage) message).getBodyLength();
}
public boolean readBoolean() throws JMSException
{
return ((BytesMessage) message).readBoolean();
}
public byte readByte() throws JMSException
{
return ((BytesMessage) message).readByte();
}
public int readBytes(byte[] value, int length) throws JMSException
{
return ((BytesMessage) message).readBytes(value, length);
}
public int readBytes(byte[] value) throws JMSException
{
return ((BytesMessage) message).readBytes(value);
}
public char readChar() throws JMSException
{
return ((BytesMessage) message).readChar();
}
public double readDouble() throws JMSException
{
return ((BytesMessage) message).readDouble();
}
public float readFloat() throws JMSException
{
return ((BytesMessage) message).readFloat();
}
public int readInt() throws JMSException
{
return ((BytesMessage) message).readInt();
}
public long readLong() throws JMSException
{
return ((BytesMessage) message).readLong();
}
public short readShort() throws JMSException
{
return ((BytesMessage) message).readShort();
}
public int readUnsignedByte() throws JMSException
{
return ((BytesMessage) message).readUnsignedByte();
}
public int readUnsignedShort() throws JMSException
{
return ((BytesMessage) message).readUnsignedShort();
}
public String readUTF() throws JMSException
{
return ((BytesMessage) message).readUTF();
}
public void reset() throws JMSException
{
((BytesMessage) message).reset();
}
public void writeBoolean(boolean value) throws JMSException
{
((BytesMessage) message).writeBoolean(value);
}
public void writeByte(byte value) throws JMSException
{
((BytesMessage) message).writeByte(value);
}
public void writeBytes(byte[] value, int offset, int length) throws JMSException
{
((BytesMessage) message).writeBytes(value, offset, length);
}
public void writeBytes(byte[] value) throws JMSException
{
((BytesMessage) message).writeBytes(value);
}
public void writeChar(char value) throws JMSException
{
((BytesMessage) message).writeChar(value);
}
public void writeDouble(double value) throws JMSException
{
((BytesMessage) message).writeDouble(value);
}
public void writeFloat(float value) throws JMSException
{
((BytesMessage) message).writeFloat(value);
}
public void writeInt(int value) throws JMSException
{
((BytesMessage) message).writeInt(value);
}
public void writeLong(long value) throws JMSException
{
((BytesMessage) message).writeLong(value);
}
public void writeObject(Object value) throws JMSException
{
((BytesMessage) message).writeObject(value);
}
public void writeShort(short value) throws JMSException
{
((BytesMessage) message).writeShort(value);
}
public void writeUTF(String value) throws JMSException
{
((BytesMessage) message).writeUTF(value);
}
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
1.1 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsQueueReceiver.java
Index: JmsQueueReceiver.java
===================================================================
/***************************************
* *
* JBoss: The OpenSource J2EE WebOS *
* *
* Distributable under LGPL license. *
* See terms of license at gnu.org. *
* *
***************************************/
package org.jboss.resource.adapter.jms;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
/**
* A wrapper for a queue receiver
*
* @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
* @version $Revision: 1.1 $
*/
public class JmsQueueReceiver extends JmsMessageConsumer implements QueueReceiver
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
/**
* Create a new wrapper
*
* @param consumer the queue receiver
* @param session the session
*/
public JmsQueueReceiver(QueueReceiver consumer, JmsSession session)
{
super(consumer, session);
}
// Public --------------------------------------------------------
// QueueReceiver implementation ----------------------------------
public Queue getQueue() throws JMSException
{
return ((QueueReceiver) consumer).getQueue();
}
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
1.1 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsMessageListener.java
Index: JmsMessageListener.java
===================================================================
/***************************************
* *
* JBoss: The OpenSource J2EE WebOS *
* *
* Distributable under LGPL license. *
* See terms of license at gnu.org. *
* *
***************************************/
package org.jboss.resource.adapter.jms;
import javax.jms.Message;
import javax.jms.MessageListener;
/**
* A wrapper for a message listener
*
* @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
* @version $Revision: 1.1 $
*/
public class JmsMessageListener implements MessageListener
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
/** The message listener */
MessageListener listener;
/** The consumer */
JmsMessageConsumer consumer;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
/**
* Create a new wrapper
*
* @param listener the listener
* @param consumer the consumer
*/
public JmsMessageListener(MessageListener listener, JmsMessageConsumer consumer)
{
this.listener = listener;
this.consumer = consumer;
}
// Public --------------------------------------------------------
// MessageListener implementation --------------------------------
public void onMessage(Message message)
{
message = consumer.wrapMessage(message);
listener.onMessage(message);
}
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
1.1 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsConnectionMetaData.java
Index: JmsConnectionMetaData.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license. See terms of license at gnu.org.
*/
package org.jboss.resource.adapter.jms;
import java.util.Enumeration;
import java.util.Vector;
import javax.jms.ConnectionMetaData;
/**
* This class implements javax.jms.ConnectionMetaData
*
* @author Norbert Lataille (Norbert.Lataille@m4x.org)
* @author Hiram Chirino (Norbert.Lataille@m4x.org)
* @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
* @version $Revision: 1.1 $
*/
public class JmsConnectionMetaData implements ConnectionMetaData
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
// ConnectionMetaData implementation -----------------------------
public String getJMSVersion()
{
return "1.1";
}
public int getJMSMajorVersion()
{
return 1;
}
public int getJMSMinorVersion()
{
return 1;
}
public String getJMSProviderName()
{
return "JBoss";
}
public String getProviderVersion()
{
return "4.0";
}
public int getProviderMajorVersion()
{
return 4;
}
public int getProviderMinorVersion()
{
return 0;
}
public Enumeration getJMSXPropertyNames()
{
Vector vector = new Vector();
vector.add("JMSXGroupID");
vector.add("JMSXGroupSeq");
return vector.elements();
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
1.1 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsStreamMessage.java
Index: JmsStreamMessage.java
===================================================================
/***************************************
* *
* JBoss: The OpenSource J2EE WebOS *
* *
* Distributable under LGPL license. *
* See terms of license at gnu.org. *
* *
***************************************/
package org.jboss.resource.adapter.jms;
import javax.jms.JMSException;
import javax.jms.StreamMessage;
/**
* A wrapper for a message
*
* @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
* @version $Revision: 1.1 $
*/
public class JmsStreamMessage extends JmsMessage implements StreamMessage
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
/**
* Create a new wrapper
*
* @param message the message
* @param session the session
*/
public JmsStreamMessage(StreamMessage message, JmsSession session)
{
super(message, session);
}
// Public --------------------------------------------------------
// StreamMessage implementation ----------------------------------
public boolean readBoolean() throws JMSException
{
return ((StreamMessage) message).readBoolean();
}
public byte readByte() throws JMSException
{
return ((StreamMessage) message).readByte();
}
public int readBytes(byte[] value) throws JMSException
{
return ((StreamMessage) message).readBytes(value);
}
public char readChar() throws JMSException
{
return ((StreamMessage) message).readChar();
}
public double readDouble() throws JMSException
{
return ((StreamMessage) message).readDouble();
}
public float readFloat() throws JMSException
{
return ((StreamMessage) message).readFloat();
}
public int readInt() throws JMSException
{
return ((StreamMessage) message).readInt();
}
public long readLong() throws JMSException
{
return ((StreamMessage) message).readLong();
}
public Object readObject() throws JMSException
{
return ((StreamMessage) message).readObject();
}
public short readShort() throws JMSException
{
return ((StreamMessage) message).readShort();
}
public String readString() throws JMSException
{
return ((StreamMessage) message).readString();
}
public void reset() throws JMSException
{
((StreamMessage) message).reset();
}
public void writeBoolean(boolean value) throws JMSException
{
((StreamMessage) message).writeBoolean(value);
}
public void writeByte(byte value) throws JMSException
{
((StreamMessage) message).writeByte(value);
}
public void writeBytes(byte[] value, int offset, int length) throws JMSException
{
((StreamMessage) message).writeBytes(value, offset, length);
}
public void writeBytes(byte[] value) throws JMSException
{
((StreamMessage) message).writeBytes(value);
}
public void writeChar(char value) throws JMSException
{
((StreamMessage) message).writeChar(value);
}
public void writeDouble(double value) throws JMSException
{
((StreamMessage) message).writeDouble(value);
}
public void writeFloat(float value) throws JMSException
{
((StreamMessage) message).writeFloat(value);
}
public void writeInt(int value) throws JMSException
{
((StreamMessage) message).writeInt(value);
}
public void writeLong(long value) throws JMSException
{
((StreamMessage) message).writeLong(value);
}
public void writeObject(Object value) throws JMSException
{
((StreamMessage) message).writeObject(value);
}
public void writeShort(short value) throws JMSException
{
((StreamMessage) message).writeShort(value);
}
public void writeString(String value) throws JMSException
{
((StreamMessage) message).writeString(value);
}
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
1.1 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsObjectMessage.java
Index: JmsObjectMessage.java
===================================================================
/***************************************
* *
* JBoss: The OpenSource J2EE WebOS *
* *
* Distributable under LGPL license. *
* See terms of license at gnu.org. *
* *
***************************************/
package org.jboss.resource.adapter.jms;
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
/**
* A wrapper for a message
*
* @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
* @version $Revision: 1.1 $
*/
public class JmsObjectMessage extends JmsMessage implements ObjectMessage
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
/**
* Create a new wrapper
*
* @param message the message
* @param session the session
*/
public JmsObjectMessage(ObjectMessage message, JmsSession session)
{
super(message, session);
}
// Public --------------------------------------------------------
// ObjectMessage implementation ----------------------------------
public Serializable getObject() throws JMSException
{
return ((ObjectMessage) message).getObject();
}
public void setObject(Serializable object) throws JMSException
{
((ObjectMessage) message).setObject(object);
}
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
1.1 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsMessageConsumer.java
Index: JmsMessageConsumer.java
===================================================================
/***************************************
* *
* JBoss: The OpenSource J2EE WebOS *
* *
* Distributable under LGPL license. *
* See terms of license at gnu.org. *
* *
***************************************/
package org.jboss.resource.adapter.jms;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
/**
* A wrapper for a message consumer
*
* @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
* @version $Revision: 1.1 $
*/
public class JmsMessageConsumer implements MessageConsumer
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
/** The wrapped message consumer */
MessageConsumer consumer;
/** The session for this consumer */
JmsSession session;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
/**
* Create a new wrapper
*
* @param consumer the consumer
* @param session the session
*/
public JmsMessageConsumer(MessageConsumer consumer, JmsSession session)
{
this.consumer = consumer;
this.session = session;
}
// Public --------------------------------------------------------
// MessageConsumer implementation --------------------------------
public void close() throws JMSException
{
try
{
closeConsumer();
}
finally
{
session.removeConsumer(this);
}
}
public MessageListener getMessageListener() throws JMSException
{
return consumer.getMessageListener();
}
public String getMessageSelector() throws JMSException
{
return consumer.getMessageSelector();
}
public Message receive() throws JMSException
{
Message message = consumer.receive();
if (message == null)
return null;
else
return wrapMessage(message);
}
public Message receive(long timeout) throws JMSException
{
Message message = consumer.receive(timeout);
if (message == null)
return null;
else
return wrapMessage(message);
}
public Message receiveNoWait() throws JMSException
{
Message message = consumer.receiveNoWait();
if (message == null)
return null;
else
return wrapMessage(message);
}
public void setMessageListener(MessageListener listener) throws JMSException
{
if (listener == null)
consumer.setMessageListener(null);
else
consumer.setMessageListener(wrapMessageListener(listener));
}
// Package protected ---------------------------------------------
void closeConsumer() throws JMSException
{
consumer.close();
}
Message wrapMessage(Message message)
{
if (message instanceof BytesMessage)
return new JmsBytesMessage((BytesMessage) message, session);
else if (message instanceof MapMessage)
return new JmsMapMessage((MapMessage) message, session);
else if (message instanceof ObjectMessage)
return new JmsObjectMessage((ObjectMessage) message, session);
else if (message instanceof StreamMessage)
return new JmsStreamMessage((StreamMessage) message, session);
else if (message instanceof TextMessage)
return new JmsTextMessage((TextMessage) message, session);
return new JmsMessage(message, session);
}
MessageListener wrapMessageListener(MessageListener listener)
{
return new JmsMessageListener(listener, this);
}
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
1.1 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsTopicSubscriber.java
Index: JmsTopicSubscriber.java
===================================================================
/***************************************
* *
* JBoss: The OpenSource J2EE WebOS *
* *
* Distributable under LGPL license. *
* See terms of license at gnu.org. *
* *
***************************************/
package org.jboss.resource.adapter.jms;
import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
/**
* A wrapper for a topic subscriber
*
* @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
* @version $Revision: 1.1 $
*/
public class JmsTopicSubscriber extends JmsMessageConsumer implements \
TopicSubscriber {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
/**
* Create a new wrapper
*
* @param consumer the topic subscriber
* @param session the session
*/
public JmsTopicSubscriber(TopicSubscriber consumer, JmsSession session)
{
super(consumer, session);
}
// Public --------------------------------------------------------
// TopicSubscriber implementation --------------------------------
public boolean getNoLocal() throws JMSException
{
return ((TopicSubscriber) consumer).getNoLocal();
}
public Topic getTopic() throws JMSException
{
return ((TopicSubscriber) consumer).getTopic();
}
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
1.1 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsTextMessage.java
Index: JmsTextMessage.java
===================================================================
/***************************************
* *
* JBoss: The OpenSource J2EE WebOS *
* *
* Distributable under LGPL license. *
* See terms of license at gnu.org. *
* *
***************************************/
package org.jboss.resource.adapter.jms;
import javax.jms.JMSException;
import javax.jms.TextMessage;
/**
* A wrapper for a message
*
* @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
* @version $Revision: 1.1 $
*/
public class JmsTextMessage extends JmsMessage implements TextMessage
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
/**
* Create a new wrapper
*
* @param message the message
* @param session the session
*/
public JmsTextMessage(TextMessage message, JmsSession session)
{
super(message, session);
}
// Public --------------------------------------------------------
// TextMessage implementation ------------------------------------
public String getText() throws JMSException
{
return ((TextMessage) message).getText();
}
public void setText(String string) throws JMSException
{
((TextMessage) message).setText(string);
}
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
1.1 \
jbosscx/src/main/org/jboss/resource/adapter/jms/JmsMessage.java
Index: JmsMessage.java
===================================================================
/***************************************
* *
* JBoss: The OpenSource J2EE WebOS *
* *
* Distributable under LGPL license. *
* See terms of license at gnu.org. *
* *
***************************************/
package org.jboss.resource.adapter.jms;
import java.util.Enumeration;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
/**
* A wrapper for a message
*
* @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
* @version $Revision: 1.1 $
*/
public class JmsMessage implements Message
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
/** The message */
Message message;
/** The session */
JmsSession session;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
/**
* Create a new wrapper
*
* @param message the message
* @param session the session
*/
public JmsMessage(Message message, JmsSession session)
{
this.message = message;
this.session = session;
}
// Public --------------------------------------------------------
// Message implementation ----------------------------------------
public void acknowledge() throws JMSException
{
session.getSession(); // Check for closed
message.acknowledge();
}
public void clearBody() throws JMSException
{
message.clearBody();
}
public void clearProperties() throws JMSException
{
message.clearProperties();
}
public boolean getBooleanProperty(String name) throws JMSException
{
return message.getBooleanProperty(name);
}
public byte getByteProperty(String name) throws JMSException
{
return message.getByteProperty(name);
}
public double getDoubleProperty(String name) throws JMSException
{
return message.getDoubleProperty(name);
}
public float getFloatProperty(String name) throws JMSException
{
return message.getFloatProperty(name);
}
public int getIntProperty(String name) throws JMSException
{
return message.getIntProperty(name);
}
public String getJMSCorrelationID() throws JMSException
{
return message.getJMSCorrelationID();
}
public byte[] getJMSCorrelationIDAsBytes() throws JMSException
{
return message.getJMSCorrelationIDAsBytes();
}
public int getJMSDeliveryMode() throws JMSException
{
return message.getJMSDeliveryMode();
}
public Destination getJMSDestination() throws JMSException
{
return message.getJMSDestination();
}
public long getJMSExpiration() throws JMSException
{
return message.getJMSExpiration();
}
public String getJMSMessageID() throws JMSException
{
return message.getJMSMessageID();
}
public int getJMSPriority() throws JMSException
{
return message.getJMSPriority();
}
public boolean getJMSRedelivered() throws JMSException
{
return message.getJMSRedelivered();
}
public Destination getJMSReplyTo() throws JMSException
{
return message.getJMSReplyTo();
}
public long getJMSTimestamp() throws JMSException
{
return message.getJMSTimestamp();
}
public String getJMSType() throws JMSException
{
return message.getJMSType();
}
public long getLongProperty(String name) throws JMSException
{
return message.getLongProperty(name);
}
public Object getObjectProperty(String name) throws JMSException
{
return message.getObjectProperty(name);
}
public Enumeration getPropertyNames() throws JMSException
{
return message.getPropertyNames();
}
public short getShortProperty(String name) throws JMSException
{
return message.getShortProperty(name);
}
public String getStringProperty(String name) throws JMSException
{
return message.getStringProperty(name);
}
public boolean propertyExists(String name) throws JMSException
{
return message.propertyExists(name);
}
public void setBooleanProperty(String name, boolean value) throws JMSException
{
message.setBooleanProperty(name, value);
}
public void setByteProperty(String name, byte value) throws JMSException
{
message.setByteProperty(name, value);
}
public void setDoubleProperty(String name, double value) throws JMSException
{
message.setDoubleProperty(name, value);
}
public void setFloatProperty(String name, float value) throws JMSException
{
message.setFloatProperty(name, value);
}
public void setIntProperty(String name, int value) throws JMSException
{
message.setIntProperty(name, value);
}
public void setJMSCorrelationID(String correlationID) throws JMSException
{
message.setJMSCorrelationID(correlationID);
}
public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException
{
message.setJMSCorrelationIDAsBytes(correlationID);
}
public void setJMSDeliveryMode(int deliveryMode) throws JMSException
{
message.setJMSDeliveryMode(deliveryMode);
}
public void setJMSDestination(Destination destination) throws JMSException
{
message.setJMSDestination(destination);
}
public void setJMSExpiration(long expiration) throws JMSException
{
message.setJMSExpiration(expiration);
}
public void setJMSMessageID(String id) throws JMSException
{
message.setJMSMessageID(id);
}
public void setJMSPriority(int priority) throws JMSException
{
message.setJMSPriority(priority);
}
public void setJMSRedelivered(boolean redelivered) throws JMSException
{
message.setJMSRedelivered(redelivered);
}
public void setJMSReplyTo(Destination replyTo) throws JMSException
{
message.setJMSReplyTo(replyTo);
}
public void setJMSTimestamp(long timestamp) throws JMSException
{
message.setJMSTimestamp(timestamp);
}
public void setJMSType(String type) throws JMSException
{
message.setJMSType(type);
}
public void setLongProperty(String name, long value) throws JMSException
{
message.setLongProperty(name, value);
}
public void setObjectProperty(String name, Object value) throws JMSException
{
message.setObjectProperty(name, value);
}
public void setShortProperty(String name, short value) throws JMSException
{
message.setShortProperty(name, value);
}
public void setStringProperty(String name, String value) throws JMSException
{
message.setStringProperty(name, value);
}
// Object overrides ----------------------------------------------
public int hashCode()
{
return message.hashCode();
}
public boolean equals(Object object)
{
if (object != null && object instanceof JmsMessage)
return message.equals(((JmsMessage) object).message);
else
return message.equals(object);
}
public String toString()
{
return message.toString();
}
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
-------------------------------------------------------
SF.Net is sponsored by: Speed Start Your Linux Apps Now.
Build and deploy apps & Web services for Linux with
a free DVD software kit from IBM. Click Now!
http://ads.osdn.com/?ad_id=1356&alloc_id=3438&op=click
_______________________________________________
jboss-cvs-commits mailing list
jboss-cvs-commits@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/jboss-cvs-commits
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic