[prev in list] [next in list] [prev in thread] [next in thread] 

List:       jboss-cvs-commits
Subject:    [jboss-cvs] jboss/src/main/org/jboss/cache/invalidation/bridges JMSCacheInvalidationBridge.java JMSC
From:       Sacha Labourey <slaboure () users ! sourceforge ! net>
Date:       2002-09-30 13:05:28
[Download RAW message or body]

  User: slaboure
  Date: 02/09/30 06:05:27

  Added:       src/main/org/jboss/cache/invalidation/bridges
                        JMSCacheInvalidationBridge.java
                        JMSCacheInvalidationBridgeMBean.java
                        JMSCacheInvalidationMessage.java
  Log:
  Added cache-invalidation bridge: JMS implementation
  
  Revision  Changes    Path
  1.1                  \
jboss/src/main/org/jboss/cache/invalidation/bridges/JMSCacheInvalidationBridge.java  
  Index: JMSCacheInvalidationBridge.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  package org.jboss.cache.invalidation.bridges;
  
  import java.io.Serializable;
  import java.util.HashSet;
  import java.util.HashMap;
  import java.util.Iterator;
  
  import javax.jms.MessageListener;
  import javax.jms.TopicConnection;
  import javax.jms.TopicSession;
  import javax.jms.Topic;
  import javax.jms.TopicSubscriber;
  import javax.jms.TopicPublisher;
  import javax.jms.Message;
  import javax.jms.ObjectMessage;
  import javax.jms.TopicConnectionFactory;
  import javax.jms.JMSException;
  import javax.naming.InitialContext;
  
  import org.jboss.cache.invalidation.InvalidationManager;
  import org.jboss.cache.invalidation.InvalidationBridgeListener;
  import org.jboss.cache.invalidation.BatchInvalidation;
  import org.jboss.system.ServiceMBeanSupport;
  
  /**
   * JMS implementation of a cache invalidation bridge
   *
   * Based on previous code of Bill Burke based on interceptors
   *
   * @see InvalidationManagerMBean
   *
   * @author  <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
   * @author  <a href="mailto:bill@jboss.org">Bill Burke</a>.
   * @version $Revision: 1.1 $
   *
   * <p><b>Revisions:</b>
   *
   * <p><b>28 septembre 2002 Sacha Labourey:</b>
   * <ul>
   * <li> First implementation </li>
   * </ul>
   */
  
  public class JMSCacheInvalidationBridge
     extends ServiceMBeanSupport
     implements JMSCacheInvalidationBridgeMBean, 
                InvalidationBridgeListener,
                MessageListener
  {   
     // Constants -----------------------------------------------------
     
     public static final String JMS_CACHE_INVALIDATION_BRIDGE = \
"JMS_CACHE_INVALIDATION_BRIDGE";  
     // Attributes ----------------------------------------------------
  
     // JMX Attributes
     //
     protected org.jboss.cache.invalidation.InvalidationManagerMBean invalMgr = null;
     protected org.jboss.cache.invalidation.BridgeInvalidationSubscription \
                invalidationSubscription = null;
     protected String invalidationManagerName = \
InvalidationManager.DEFAULT_JMX_SERVICE_NAME;  
     protected boolean publishingAuthorized = false;
     protected String connectionFactoryName = "java:/ConnectionFactory";
     protected String topicName = "topic/JMSCacheInvalidationBridge";
     protected boolean transacted = true;
     protected int acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE; // AUTO_ACK by \
                default
     protected int propagationMode = \
JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION; // IN_OUT by default  
     protected  java.rmi.dgc.VMID serviceId = new java.rmi.dgc.VMID();
   
     protected TopicConnection  conn = null;
     protected TopicSession session = null;
     protected Topic topic = null;
     protected TopicSubscriber subscriber = null;
     protected TopicPublisher pub = null;
  
     // Static --------------------------------------------------------
     
     // Constructors --------------------------------------------------
     
     public JMSCacheInvalidationBridge () { super (); }
     
     // Public --------------------------------------------------------
     
     // *MBean implementation ----------------------------------------------
  
     public String getInvalidationManager ()
     {
        return this.invalidationManagerName;
     }
     
     public void setInvalidationManager (String objectName)
     {
        this.invalidationManagerName = objectName;
     }
     
     public String getConnectionFactoryName ()
     {
        return this.connectionFactoryName;
     }   
     public void setConnectionFactoryName (String factoryName)
     {
        this.connectionFactoryName = factoryName;
     }
     
     public String getTopicName ()
     {
        return this.topicName;
     }
     public void setTopicName (String topicName)
     {
        this.topicName = topicName;
     }
     
     public boolean isTransacted ()
     {
        return this.transacted;
     }
     public void setTransacted (boolean isTransacted)
     {
        this.transacted = isTransacted;
     }
     
     public int getAcknowledgeMode ()
     {
        return this.acknowledgeMode;
     }
     public void setAcknowledgeMode (int ackMode)
     {
        if (ackMode > 3 || ackMode < 1)
           throw new RuntimeException ("Value AcknowledgeMode must be between 1 and \
3");  
        switch (ackMode)
        {
           case 1: this.acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE; break;
           case 2: this.acknowledgeMode = TopicSession.CLIENT_ACKNOWLEDGE; break;
           case 3: this.acknowledgeMode = TopicSession.DUPS_OK_ACKNOWLEDGE; break;
        }
     }
     
     public int getPropagationMode ()
     {
        return this.propagationMode;
     }   
     public void setPropagationMode (int propMode)
     {
        if (propMode > 3 || propMode < 1)
           throw new RuntimeException ("Value PropagationMode must be between 1 and \
3");  
        this.propagationMode = propMode;
     }
  
     // MessageListener implementation ----------------------------------------------
     
     public void onMessage(Message msg)
     {
        // just to make sure we are in the good mode
        //
        if (this.propagationMode == \
                JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
              this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION)  {         
           try
           {
              ObjectMessage objmsg = (ObjectMessage)msg;
              if (!objmsg.getJMSType().equals(JMS_CACHE_INVALIDATION_BRIDGE)) return; \
                
              JMSCacheInvalidationMessage content = \
(JMSCacheInvalidationMessage)objmsg.getObject();  
              // Not very efficient as the whole message must be unserialized just to \
                check
              // if we were the emitter. Maybe wrapping this in a byte array would be \
more efficient  //
              if (!content.emitter.equals (this.serviceId))
              {
                 this.invalidationSubscription.batchInvalidate \
(content.getInvalidations ());  }
           }
           catch (Exception ex)
           {
              log.warn(ex.getMessage());
           }
        }
     }
  
     // InvalidationBridgeListener implementation \
----------------------------------------------  
     public void batchInvalidate (BatchInvalidation[] invalidations, boolean \
asynchronous)  {
        if ( (this.propagationMode == \
                JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
              this.propagationMode == \
JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)  && \
this.publishingAuthorized)  {         
           JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage \
(this.serviceId, invalidations);  this.sendJMSInvalidationEvent (msg);
        }
     }
  
     public void invalidate (String invalidationGroupName, Serializable[] keys, \
boolean asynchronous)  {
        if ( (this.propagationMode == \
                JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
              this.propagationMode == \
JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)  && \
this.publishingAuthorized)  {         
           JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage (
                    this.serviceId, 
                    invalidationGroupName, 
                    keys);
           this.sendJMSInvalidationEvent (msg);
        }
     }
  
     public void invalidate (String invalidationGroupName, Serializable key, boolean \
asynchronous)  {
        if ( (this.propagationMode == \
                JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
              this.propagationMode == \
JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)  && \
this.publishingAuthorized)  {         
           JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage (
                    this.serviceId, 
                    invalidationGroupName, 
                    new Serializable[] {key} );
           this.sendJMSInvalidationEvent (msg);
        }
     }
  
     public void newGroupCreated (String groupInvalidationName)
     {
        // we don't manage groups dynamically, so we don't really care...
        //
     }   
     
     public void groupIsDropped (String groupInvalidationName)
     {
        // we don't manage groups dynamically, so we don't really care...
        //
     }
  
     // ServiceMBeanSupport overrides \
---------------------------------------------------  
     protected void startService () throws Exception
     {
        log.info("Starting JMS cache invalidation bridge");
              
        // Deal with the InvalidationManager first..
        //
        this.invalMgr = (org.jboss.cache.invalidation.InvalidationManagerMBean)
           org.jboss.system.Registry.lookup (this.invalidationManagerName);
        
        this.invalidationSubscription = invalMgr.registerBridgeListener (this);
  
        // deal with JMS next
        //
        InitialContext iniCtx = new InitialContext();
        
        Object tmp = iniCtx.lookup(this.connectionFactoryName);
        TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
        conn = tcf.createTopicConnection();
        
        topic = (Topic) iniCtx.lookup(this.topicName);
        session = conn.createTopicSession(this.transacted,
                                          this.acknowledgeMode);
        
        conn.start();
        
        // Are we publisher, subscriber, or both?
        //
        if (this.propagationMode == \
                JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
              this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION)  {
           this.subscriber = session.createSubscriber(topic);     
           this.subscriber.setMessageListener(this);
        }
        
        if (this.propagationMode == \
                JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
              this.propagationMode == \
JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)  {
           this.pub = session.createPublisher(topic);
           this.publishingAuthorized = true;
        }
     }
     
     protected void stopService ()
     {
        log.info ("Stoping JMS cache invalidation bridge");
        try
        {
           if (this.propagationMode == \
                JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
                 this.propagationMode == \
JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION)  {
              subscriber.close();
           }
  
           if (this.propagationMode == \
                JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
                 this.propagationMode == \
JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)  {
              this.publishingAuthorized = false;
              pub.close();
           }
  
           conn.stop();
           session.close();
           conn.close();
           
        }
        catch (Exception ex)
        {
           log.warn("Failed to stop JMS resources associated with the JMS bridge: ", \
ex);  }
     }
  
     // Package protected ---------------------------------------------
     
     // Protected -----------------------------------------------------
     
     protected synchronized TopicSession getSession()
     {
        return this.session;
     }
  
     protected synchronized TopicPublisher getPublisher()
     {
        return this.pub;
     }
  
     protected void sendJMSInvalidationEvent(JMSCacheInvalidationMessage \
invalidationMsg)  {
        try
        {
           if (log.isTraceEnabled ())
              log.trace("sending JMS message for cache invalidation" + \
invalidationMsg);  
           try
           {
              ObjectMessage msg = getSession().createObjectMessage();
              msg.setJMSType(JMS_CACHE_INVALIDATION_BRIDGE);
              msg.setObject(invalidationMsg);
              getPublisher().publish(msg);
           }
           catch (JMSException ex)
           {
              log.debug("failed to publish seppuku event: ", ex);
           }
        }
        catch (Exception ex)
        {
           log.warn("failed to do cluster seppuku event: " , ex);
        }
     }
  
     // Private -------------------------------------------------------
     
     // Inner classes -------------------------------------------------
     
  }
  
  
  
  1.1                  \
jboss/src/main/org/jboss/cache/invalidation/bridges/JMSCacheInvalidationBridgeMBean.java
  
  Index: JMSCacheInvalidationBridgeMBean.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  package org.jboss.cache.invalidation.bridges;
  
  /**
   * Cache invalidation bridge based on JMS.
   * The list of InvalidationGroup to be bridged is *not* automatically
   * discovered, thus, all invalidation messages that are locally generated
   * are forwarded over JMS.
   * In the future, it should be possible, through a JMX attribute, to list
   * the InvalidationGroup that should be included/excluded
   *
   * @see InvalidationManagerMBean
   *
   * @author  <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
   * @version $Revision: 1.1 $
   *
   * <p><b>Revisions:</b>
   *
   * <p><b>28 septembre 2002 Sacha Labourey:</b>
   * <ul>
   * <li> First implementation </li>
   * </ul>
   */
  
  public interface JMSCacheInvalidationBridgeMBean
  {
     public static final int AUTO_ACKNOWLEDGE_MODE = 1;
     public static final int CLIENT_ACKNOWLEDGE_MODE = 2;
     public static final int DUPS_OK_ACKNOWLEDGE_MODE = 3;
     
     public static final int IN_OUT_BRIDGE_PROPAGATION = 1;
     public static final int IN_ONLY_BRIDGE_PROPAGATION = 2;
     public static final int OUT_ONLY_BRIDGE_PROPAGATION = 3;
     
     /**
      * ObjectName of the InvalidationManager to be used. Optional: in this
      * case, the default InvalidationManager is used.
      */
     public String getInvalidationManager ();
     public void setInvalidationManager (String objectName);
  
     /**
      * JNDI name of the JMS connection factory to use for cache invalidations
      */   
     public String getConnectionFactoryName ();
     public void setConnectionFactoryName (String factoryName);
     
     /**
      * JNDI name of the Topic to use to send/receive cache invalidations.
      * Defaults to "topic/JMSCacheInvalidationBridge"
      */   
     public String getTopicName ();
     public void setTopicName (String topicName);
     
     /**
      * Status of the JMS topic wrt transactions
      */   
     public boolean isTransacted ();
     public void setTransacted (boolean isTransacted);
     
     /**
      * Status of the JMS topic wrt messages acknowledgement
      */   
     public int getAcknowledgeMode ();
     public void setAcknowledgeMode (int ackMode);
     
     /**
      * Indicates if this bridge should:
      * 1 - Post local invalidations to the topic and invalidate local caches with \
                invalidations received on the topic
      * 2 - Only invalidate local caches with invalidations received on the topic but \
                not post anything on the topic
      * 3 - Only post local invalidations to the topic and not listen to the Topic \
                for invalidation messages
      */   
     public int getPropagationMode ();
     public void setPropagationMode (int propagationMode);
          
  }
  
  
  
  1.1                  \
jboss/src/main/org/jboss/cache/invalidation/bridges/JMSCacheInvalidationMessage.java  \
  Index: JMSCacheInvalidationMessage.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  package org.jboss.cache.invalidation.bridges;
  
  import org.jboss.cache.invalidation.BatchInvalidation;
  
  /**
   * <description>
   *
   * @see <related>
   *
   * @author  <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
   * @version $Revision: 1.1 $
   *
   * <p><b>Revisions:</b>
   *
   * <p><b>30. septembre 2002 Sacha Labourey:</b>
   * <ul>
   * <li> First implementation </li>
   * </ul>
   */
  
  public class JMSCacheInvalidationMessage
     implements java.io.Serializable
  {
     
     // Constants -----------------------------------------------------
     
     // Attributes ----------------------------------------------------
     
     protected BatchInvalidation[] bis = null;
     protected java.rmi.dgc.VMID emitter = null;
     
     // Static --------------------------------------------------------
     
     // Constructors --------------------------------------------------
     
     public JMSCacheInvalidationMessage (java.rmi.dgc.VMID source, 
                                         String groupName, 
                                         java.io.Serializable[] keys)
     {
        this.emitter = source;
        this.bis = new BatchInvalidation[] 
        {
           new BatchInvalidation (keys, groupName)
        };
     }
     
     public JMSCacheInvalidationMessage (java.rmi.dgc.VMID source, 
                                         BatchInvalidation[] invalidations)
     {
        this.emitter = source;
        this.bis = invalidations;
     }
     
     // Public --------------------------------------------------------
     
     public BatchInvalidation[] getInvalidations()
     {
        if (this.bis == null)
           this.bis = new BatchInvalidation[0];
        
        return this.bis;
     }
     
     // Z implementation ----------------------------------------------
     
     // Y overrides ---------------------------------------------------
     
     // Package protected ---------------------------------------------
     
     // Protected -----------------------------------------------------
     
     // Private -------------------------------------------------------
     
     // Inner classes -------------------------------------------------
     
  }
  
  
  


-------------------------------------------------------
This sf.net email is sponsored by:ThinkGeek
Welcome to geek heaven.
http://thinkgeek.com/sf
_______________________________________________
jboss-cvs-commits mailing list
jboss-cvs-commits@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/jboss-cvs-commits


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic