[prev in list] [next in list] [prev in thread] [next in thread]
List: jboss-cvs-commits
Subject: [jboss-cvs] jboss/src/main/org/jboss/cache/invalidation/bridges JMSCacheInvalidationBridge.java JMSC
From: Sacha Labourey <slaboure () users ! sourceforge ! net>
Date: 2002-09-30 13:05:28
[Download RAW message or body]
User: slaboure
Date: 02/09/30 06:05:27
Added: src/main/org/jboss/cache/invalidation/bridges
JMSCacheInvalidationBridge.java
JMSCacheInvalidationBridgeMBean.java
JMSCacheInvalidationMessage.java
Log:
Added cache-invalidation bridge: JMS implementation
Revision Changes Path
1.1 \
jboss/src/main/org/jboss/cache/invalidation/bridges/JMSCacheInvalidationBridge.java
Index: JMSCacheInvalidationBridge.java
===================================================================
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.invalidation.bridges;
import java.io.Serializable;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
import javax.jms.MessageListener;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.jms.TopicPublisher;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.TopicConnectionFactory;
import javax.jms.JMSException;
import javax.naming.InitialContext;
import org.jboss.cache.invalidation.InvalidationManager;
import org.jboss.cache.invalidation.InvalidationBridgeListener;
import org.jboss.cache.invalidation.BatchInvalidation;
import org.jboss.system.ServiceMBeanSupport;
/**
* JMS implementation of a cache invalidation bridge
*
* Based on previous code of Bill Burke based on interceptors
*
* @see InvalidationManagerMBean
*
* @author <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
* @author <a href="mailto:bill@jboss.org">Bill Burke</a>.
* @version $Revision: 1.1 $
*
* <p><b>Revisions:</b>
*
* <p><b>28 septembre 2002 Sacha Labourey:</b>
* <ul>
* <li> First implementation </li>
* </ul>
*/
public class JMSCacheInvalidationBridge
extends ServiceMBeanSupport
implements JMSCacheInvalidationBridgeMBean,
InvalidationBridgeListener,
MessageListener
{
// Constants -----------------------------------------------------
public static final String JMS_CACHE_INVALIDATION_BRIDGE = \
"JMS_CACHE_INVALIDATION_BRIDGE";
// Attributes ----------------------------------------------------
// JMX Attributes
//
protected org.jboss.cache.invalidation.InvalidationManagerMBean invalMgr = null;
protected org.jboss.cache.invalidation.BridgeInvalidationSubscription \
invalidationSubscription = null;
protected String invalidationManagerName = \
InvalidationManager.DEFAULT_JMX_SERVICE_NAME;
protected boolean publishingAuthorized = false;
protected String connectionFactoryName = "java:/ConnectionFactory";
protected String topicName = "topic/JMSCacheInvalidationBridge";
protected boolean transacted = true;
protected int acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE; // AUTO_ACK by \
default
protected int propagationMode = \
JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION; // IN_OUT by default
protected java.rmi.dgc.VMID serviceId = new java.rmi.dgc.VMID();
protected TopicConnection conn = null;
protected TopicSession session = null;
protected Topic topic = null;
protected TopicSubscriber subscriber = null;
protected TopicPublisher pub = null;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public JMSCacheInvalidationBridge () { super (); }
// Public --------------------------------------------------------
// *MBean implementation ----------------------------------------------
public String getInvalidationManager ()
{
return this.invalidationManagerName;
}
public void setInvalidationManager (String objectName)
{
this.invalidationManagerName = objectName;
}
public String getConnectionFactoryName ()
{
return this.connectionFactoryName;
}
public void setConnectionFactoryName (String factoryName)
{
this.connectionFactoryName = factoryName;
}
public String getTopicName ()
{
return this.topicName;
}
public void setTopicName (String topicName)
{
this.topicName = topicName;
}
public boolean isTransacted ()
{
return this.transacted;
}
public void setTransacted (boolean isTransacted)
{
this.transacted = isTransacted;
}
public int getAcknowledgeMode ()
{
return this.acknowledgeMode;
}
public void setAcknowledgeMode (int ackMode)
{
if (ackMode > 3 || ackMode < 1)
throw new RuntimeException ("Value AcknowledgeMode must be between 1 and \
3");
switch (ackMode)
{
case 1: this.acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE; break;
case 2: this.acknowledgeMode = TopicSession.CLIENT_ACKNOWLEDGE; break;
case 3: this.acknowledgeMode = TopicSession.DUPS_OK_ACKNOWLEDGE; break;
}
}
public int getPropagationMode ()
{
return this.propagationMode;
}
public void setPropagationMode (int propMode)
{
if (propMode > 3 || propMode < 1)
throw new RuntimeException ("Value PropagationMode must be between 1 and \
3");
this.propagationMode = propMode;
}
// MessageListener implementation ----------------------------------------------
public void onMessage(Message msg)
{
// just to make sure we are in the good mode
//
if (this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION) {
try
{
ObjectMessage objmsg = (ObjectMessage)msg;
if (!objmsg.getJMSType().equals(JMS_CACHE_INVALIDATION_BRIDGE)) return; \
JMSCacheInvalidationMessage content = \
(JMSCacheInvalidationMessage)objmsg.getObject();
// Not very efficient as the whole message must be unserialized just to \
check
// if we were the emitter. Maybe wrapping this in a byte array would be \
more efficient //
if (!content.emitter.equals (this.serviceId))
{
this.invalidationSubscription.batchInvalidate \
(content.getInvalidations ()); }
}
catch (Exception ex)
{
log.warn(ex.getMessage());
}
}
}
// InvalidationBridgeListener implementation \
----------------------------------------------
public void batchInvalidate (BatchInvalidation[] invalidations, boolean \
asynchronous) {
if ( (this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == \
JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) && \
this.publishingAuthorized) {
JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage \
(this.serviceId, invalidations); this.sendJMSInvalidationEvent (msg);
}
}
public void invalidate (String invalidationGroupName, Serializable[] keys, \
boolean asynchronous) {
if ( (this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == \
JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) && \
this.publishingAuthorized) {
JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage (
this.serviceId,
invalidationGroupName,
keys);
this.sendJMSInvalidationEvent (msg);
}
}
public void invalidate (String invalidationGroupName, Serializable key, boolean \
asynchronous) {
if ( (this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == \
JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) && \
this.publishingAuthorized) {
JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage (
this.serviceId,
invalidationGroupName,
new Serializable[] {key} );
this.sendJMSInvalidationEvent (msg);
}
}
public void newGroupCreated (String groupInvalidationName)
{
// we don't manage groups dynamically, so we don't really care...
//
}
public void groupIsDropped (String groupInvalidationName)
{
// we don't manage groups dynamically, so we don't really care...
//
}
// ServiceMBeanSupport overrides \
---------------------------------------------------
protected void startService () throws Exception
{
log.info("Starting JMS cache invalidation bridge");
// Deal with the InvalidationManager first..
//
this.invalMgr = (org.jboss.cache.invalidation.InvalidationManagerMBean)
org.jboss.system.Registry.lookup (this.invalidationManagerName);
this.invalidationSubscription = invalMgr.registerBridgeListener (this);
// deal with JMS next
//
InitialContext iniCtx = new InitialContext();
Object tmp = iniCtx.lookup(this.connectionFactoryName);
TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
conn = tcf.createTopicConnection();
topic = (Topic) iniCtx.lookup(this.topicName);
session = conn.createTopicSession(this.transacted,
this.acknowledgeMode);
conn.start();
// Are we publisher, subscriber, or both?
//
if (this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION) {
this.subscriber = session.createSubscriber(topic);
this.subscriber.setMessageListener(this);
}
if (this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == \
JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) {
this.pub = session.createPublisher(topic);
this.publishingAuthorized = true;
}
}
protected void stopService ()
{
log.info ("Stoping JMS cache invalidation bridge");
try
{
if (this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION) {
subscriber.close();
}
if (this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == \
JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION) {
this.publishingAuthorized = false;
pub.close();
}
conn.stop();
session.close();
conn.close();
}
catch (Exception ex)
{
log.warn("Failed to stop JMS resources associated with the JMS bridge: ", \
ex); }
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
protected synchronized TopicSession getSession()
{
return this.session;
}
protected synchronized TopicPublisher getPublisher()
{
return this.pub;
}
protected void sendJMSInvalidationEvent(JMSCacheInvalidationMessage \
invalidationMsg) {
try
{
if (log.isTraceEnabled ())
log.trace("sending JMS message for cache invalidation" + \
invalidationMsg);
try
{
ObjectMessage msg = getSession().createObjectMessage();
msg.setJMSType(JMS_CACHE_INVALIDATION_BRIDGE);
msg.setObject(invalidationMsg);
getPublisher().publish(msg);
}
catch (JMSException ex)
{
log.debug("failed to publish seppuku event: ", ex);
}
}
catch (Exception ex)
{
log.warn("failed to do cluster seppuku event: " , ex);
}
}
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
1.1 \
jboss/src/main/org/jboss/cache/invalidation/bridges/JMSCacheInvalidationBridgeMBean.java
Index: JMSCacheInvalidationBridgeMBean.java
===================================================================
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.invalidation.bridges;
/**
* Cache invalidation bridge based on JMS.
* The list of InvalidationGroup to be bridged is *not* automatically
* discovered, thus, all invalidation messages that are locally generated
* are forwarded over JMS.
* In the future, it should be possible, through a JMX attribute, to list
* the InvalidationGroup that should be included/excluded
*
* @see InvalidationManagerMBean
*
* @author <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
* @version $Revision: 1.1 $
*
* <p><b>Revisions:</b>
*
* <p><b>28 septembre 2002 Sacha Labourey:</b>
* <ul>
* <li> First implementation </li>
* </ul>
*/
public interface JMSCacheInvalidationBridgeMBean
{
public static final int AUTO_ACKNOWLEDGE_MODE = 1;
public static final int CLIENT_ACKNOWLEDGE_MODE = 2;
public static final int DUPS_OK_ACKNOWLEDGE_MODE = 3;
public static final int IN_OUT_BRIDGE_PROPAGATION = 1;
public static final int IN_ONLY_BRIDGE_PROPAGATION = 2;
public static final int OUT_ONLY_BRIDGE_PROPAGATION = 3;
/**
* ObjectName of the InvalidationManager to be used. Optional: in this
* case, the default InvalidationManager is used.
*/
public String getInvalidationManager ();
public void setInvalidationManager (String objectName);
/**
* JNDI name of the JMS connection factory to use for cache invalidations
*/
public String getConnectionFactoryName ();
public void setConnectionFactoryName (String factoryName);
/**
* JNDI name of the Topic to use to send/receive cache invalidations.
* Defaults to "topic/JMSCacheInvalidationBridge"
*/
public String getTopicName ();
public void setTopicName (String topicName);
/**
* Status of the JMS topic wrt transactions
*/
public boolean isTransacted ();
public void setTransacted (boolean isTransacted);
/**
* Status of the JMS topic wrt messages acknowledgement
*/
public int getAcknowledgeMode ();
public void setAcknowledgeMode (int ackMode);
/**
* Indicates if this bridge should:
* 1 - Post local invalidations to the topic and invalidate local caches with \
invalidations received on the topic
* 2 - Only invalidate local caches with invalidations received on the topic but \
not post anything on the topic
* 3 - Only post local invalidations to the topic and not listen to the Topic \
for invalidation messages
*/
public int getPropagationMode ();
public void setPropagationMode (int propagationMode);
}
1.1 \
jboss/src/main/org/jboss/cache/invalidation/bridges/JMSCacheInvalidationMessage.java \
Index: JMSCacheInvalidationMessage.java
===================================================================
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.invalidation.bridges;
import org.jboss.cache.invalidation.BatchInvalidation;
/**
* <description>
*
* @see <related>
*
* @author <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
* @version $Revision: 1.1 $
*
* <p><b>Revisions:</b>
*
* <p><b>30. septembre 2002 Sacha Labourey:</b>
* <ul>
* <li> First implementation </li>
* </ul>
*/
public class JMSCacheInvalidationMessage
implements java.io.Serializable
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
protected BatchInvalidation[] bis = null;
protected java.rmi.dgc.VMID emitter = null;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public JMSCacheInvalidationMessage (java.rmi.dgc.VMID source,
String groupName,
java.io.Serializable[] keys)
{
this.emitter = source;
this.bis = new BatchInvalidation[]
{
new BatchInvalidation (keys, groupName)
};
}
public JMSCacheInvalidationMessage (java.rmi.dgc.VMID source,
BatchInvalidation[] invalidations)
{
this.emitter = source;
this.bis = invalidations;
}
// Public --------------------------------------------------------
public BatchInvalidation[] getInvalidations()
{
if (this.bis == null)
this.bis = new BatchInvalidation[0];
return this.bis;
}
// Z implementation ----------------------------------------------
// Y overrides ---------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
-------------------------------------------------------
This sf.net email is sponsored by:ThinkGeek
Welcome to geek heaven.
http://thinkgeek.com/sf
_______________________________________________
jboss-cvs-commits mailing list
jboss-cvs-commits@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/jboss-cvs-commits
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic