[prev in list] [next in list] [prev in thread] [next in thread] 

List:       jacorb-developer
Subject:    Re: [jacorb-developer] Re: Answer to Problems with EventService
From:       Gerald Brose <brose () inf ! fu-berlin ! de>
Date:       1999-10-08 12:54:10
Message-ID: 37FDE972.6B36720E () inf ! fu-berlin ! de
[Download RAW message or body]

Oscar,

thanks a lot for your code, you are absolutely right with your
observation, and your solution is exactly how it should have been
in the first place. I have incorporated you changes into our source
tree, so from any JacORB relase  > beta12  will behave as expected.

Thanks again and regards, Gerald Brose.

Oscar Saavedra wrote:
> 
> Hello everybody:
> 
> My problem was that independently which mechanism (blocking or
> non-blocking) I used for receiving notifications, the class
> "PullConsumerDemo" in the example "Events" always answered executing the
> non-blocking mechanism.
> 
> I checked the following files : "EventChannelImpl.java",
> "ProxyPullConsumerImpl" and I noticed that the method called
> "internal_pull" and the method called "internal_try_pull" (include in
> the first file) call the method "internal_try_pull" included in the
> second file. That was why the
> example always used a non-blocking mechanism.
> 
> I attach those two files with the modifications I have included in
> order to give a solution to that problem.
> 
> ------------------------------------------------------------------------
> package jacorb.events;
> 
> /*
> *        JacORB - a free Java ORB
> *
> *   Copyright (C) 1997-98  Gerald Brose.
> *
> *   This library is free software; you can redistribute it and/or
> *   modify it under the terms of the GNU Library General Public
> *   License as published by the Free Software Foundation; either
> *   version 2 of the License, or (at your option) any later version.
> *
> *   This library is distributed in the hope that it will be useful,
> *   but WITHOUT ANY WARRANTY; without even the implied warranty of
> *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> *   Library General Public License for more details.
> *
> *   You should have received a copy of the GNU Library General Public
> *   License along with this library; if not, write to the Free
> *   Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
> */
> 
> import org.omg.CosEventChannelAdmin.*;
> import org.omg.CosEventComm.*;
> import org.omg.CORBA.*;
> import java.util.*;
> import jacorb.orb.*;
> import java.net.*;
> 
> /**
> * Simple implementation of the event channel spec.
> * The event channel acts as a factory for proxy push/pull consumers/suppliers
> * and interacts with the implementation objects locally, i.e. using Java
> * references only.
> *
> * @author Joerg v. Frantzius, Rainer Lischetzki, Gerald Brose
> * @version $Id: EventChannelImpl.java,v 1.1.1.1 1999-08-05 12:22:25+02 brose Exp $
> */
> 
> public class EventChannelImpl
> extends JacORBEventChannelPOA
> {
> Vector pull_suppliers;
> Vector pull_consumers;
> Vector push_suppliers;
> Vector push_consumers;
> Vector pending_events;
> 
> org.omg.CORBA.ORB orb;
> org.omg.PortableServer.POA poa;
> 
> class Sender implements Runnable
> {
> org.omg.CORBA.Any asynchEvent;
> Vector p_suppliers= (Vector)push_suppliers.clone();
> 
> public Sender(org.omg.CORBA.Any event)
> {
> asynchEvent = event;
> }
> 
> public synchronized void run()
> {
> // hand over the event asynchronously to
> // our ProxyPushSuppliers
> for (Enumeration e = push_suppliers.elements(); e.hasMoreElements();)
> {
> ProxyPushSupplierImpl p = null;
> try
> {
> p = (ProxyPushSupplierImpl)e.nextElement();
> p.internal_push( asynchEvent );
> }
> catch ( org.omg.CosEventComm.Disconnected d)
> {
> if( p != null )
> p.disconnect_push_supplier();
> else
> System.out.println(d);
> }
> }
> }
> }
> 
> /* EventChannel */
> 
> public EventChannelImpl()
> {
> pull_suppliers = new Vector();
> pull_consumers = new Vector();
> push_suppliers = new Vector();
> push_consumers = new Vector();
> pending_events = new Vector();
> orb = org.omg.CORBA.ORB.init();
> _orb(orb);
> try
> {
> poa = org.omg.PortableServer.POAHelper.narrow(orb.resolve_initial_references("RootPOA"));
>  poa.the_POAManager().activate();
> }
> catch ( Exception e )
> {
> e.printStackTrace();
> }
> }
> 
> public void destroy()
> {
> // destroy lists !
> }
> 
> /* management of local proxies */
> 
> protected void disconnect_pull_consumer( ProxyPullConsumerImpl p )
> {
> pull_consumers.removeElement( p );
> }
> 
> protected void disconnect_pull_supplier( ProxyPullSupplierImpl p )
> {
> pull_suppliers.removeElement( p );
> }
> 
> protected void disconnect_push_consumer( ProxyPushConsumerImpl p )
> {
> push_consumers.removeElement( p );
> }
> 
> protected void disconnect_push_supplier( ProxyPushSupplierImpl p )
> {
> push_suppliers.removeElement( p );
> }
> 
> /* public admin interface */
> 
> public ConsumerAdmin for_consumers()
> {
> try
> {
> return ConsumerAdminHelper.narrow(poa.servant_to_reference(this));
> }
> catch (Exception e)
> {
> e.printStackTrace();
> return null;
> }
> }
> 
> public SupplierAdmin for_suppliers()
> {
> try
> {
> return SupplierAdminHelper.narrow(poa.servant_to_reference(this));
> }
> catch (Exception e)
> {
> e.printStackTrace();
> return null;
> }
> }
> 
> public synchronized ProxyPullConsumer obtain_pull_consumer()
> {
> try
> {
> ProxyPullConsumerImpl p =  new ProxyPullConsumerImpl( this );
> pull_consumers.addElement( p );
> return ProxyPullConsumerHelper.narrow(poa.servant_to_reference(p));
> }
> catch ( Exception e )
> {
> e.printStackTrace();
> return null;
> }
> }
> 
> public ProxyPullSupplier obtain_pull_supplier()
> {
> try
> {
> ProxyPullSupplierImpl p =  new ProxyPullSupplierImpl ( this );
> pull_suppliers.addElement( p );
> return ProxyPullSupplierHelper.narrow(poa.servant_to_reference(p));
> }
> catch ( Exception e )
> {
> e.printStackTrace();
> return null;
> }
> }
> 
> // SupplierAdmin-Interface
> public synchronized ProxyPushConsumer obtain_push_consumer()
> {
> try
> {
> ProxyPushConsumerImpl p = new ProxyPushConsumerImpl( this );
> push_consumers.addElement( p );
> return ProxyPushConsumerHelper.narrow(poa.servant_to_reference(p));
> }
> catch ( Exception e )
> {
> e.printStackTrace();
> return null;
> }
> }
> 
> // ConsumerAdmin-Interface
> public ProxyPushSupplier obtain_push_supplier()
> {
> try
> {
> ProxyPushSupplierImpl p = new ProxyPushSupplierImpl( this );
> push_suppliers.addElement( p );
> return ProxyPushSupplierHelper.narrow(poa.servant_to_reference(p));
> }
> catch ( Exception e )
> {
> e.printStackTrace();
> return null;
> }
> }
> 
> /* internal interface */
> 
> protected synchronized org.omg.CORBA.Any internal_pull( ProxyPullSupplierImpl p )
> throws Disconnected
> {
> Vector tmp;
> boolean found_event = false;
> /*************************/
> // Modified by Oscar Saavedra ; 7/Oct/99.
> // There is no need to use a BooleanHolder here.
> // BooleanHolder found_event_ref = new BooleanHolder();
> org.omg.CORBA.Any event = null;
> 
> while ( !found_event )
> {
> /*****************************/
> // Modified by Oscar Saavedra ; 7/Oct/99.
> // There is no need to use a BooleanHolder here.
> // found_event_ref.value = false;
> 
> // pending_events ?
> for (Enumeration e = pending_events.elements(); e.hasMoreElements()
> && !found_event;)
> {
> EventListElement evt = (EventListElement)e.nextElement();
> if (evt.is_for_proxy(p))
> {
> event = evt.event;
> evt.consumers.removeElement(p);
> if( evt.consumers.isEmpty() )
> pending_events.removeElement( evt );
> return event;
> // ? found_event = true;
> }
> }
> 
> if( !found_event && pull_consumers.isEmpty())
> {
> try
> {
> wait();     // notify for consumers_add_element and push.
> 
> /* notify can happen at two places: either a new pull supplier
> * has registered, or a new event was pushed into the channel
> */
> if( pull_consumers.isEmpty() ) // i.e. a new event has arrived
> continue;
> }
> catch  (InterruptedException e)
> {}
> }
> 
> // no pending_events, but suppliers exixt
> if ( !found_event && !pull_consumers.isEmpty() )
> {
> // pull exiting suppliers (ie proxyconsumers!)
> for (Enumeration e = pull_consumers.elements(); e.hasMoreElements() && \
> !found_event;) {
> ProxyPullConsumerImpl lieferant = (ProxyPullConsumerImpl)e.nextElement();
> /* pull suppliers might have disconnected */
> try
> {
> /*************************************/
> // Modified by Oscar Saavedra ; 7/Oct/99.
> // This is a call to a new method implemented by me in ProxyPullConsumerImpl.java
> event = lieferant.internal_pull( );
> found_event = true;
> }
> catch ( Disconnected dis)
> {
> disconnect_pull_consumer( lieferant );
> continue;
> }
> if (found_event)
> {
> // und fuer (andere!) Konsumenten (also: ProxyPullSuppliers) in den Puffer \
> schreiben. tmp = (Vector)pull_suppliers.clone();
> tmp.removeElement( p );
> EventListElement x = new EventListElement( event, tmp );
> pending_events.addElement( x );
> notify();
> }
> }
> }
> 
> // no events, but suppliers exited and pulling failed:
> if (!found_event)
> {
> try
> {
> wait(20);
> }
> catch  (InterruptedException e) {}
> }
> 
> } // end while
> return (event);
> }
> 
> protected synchronized void internal_push( org.omg.CORBA.Any event)
> throws Disconnected
> {
> Thread pushThread;
> if (!push_suppliers.isEmpty())
> {
> //  pushThread = new Thread( new Sender(event, push_suppliers ));
> pushThread = new Thread( new Sender(event ));
> pushThread.start();
> }
> 
> if (!pull_suppliers.isEmpty())
> {
> EventListElement e = new EventListElement( event, pull_suppliers );
> pending_events.addElement( e );
> }
> notify();
> }
> 
> protected synchronized org.omg.CORBA.Any internal_try_pull ( ProxyPullSupplierImpl \
> p, BooleanHolder has_event )
> throws Disconnected
> {
> Vector tmp;
> boolean found_event = false;
> BooleanHolder found_event_ref;
> org.omg.CORBA.Any event = null;
> 
> found_event_ref = new BooleanHolder();
> found_event_ref.value =false;
> for (Enumeration e = pending_events.elements(); e.hasMoreElements() && \
> !found_event;) {
> EventListElement evt = (EventListElement)e.nextElement();
> if (evt.is_for_proxy(p))
> {
> found_event = true;
> event = evt.event;
> break;
> }
> }
> 
> // no pending events. try-pull existing suppliers
> if ( !found_event && !pull_consumers.isEmpty())
> {
> for (Enumeration e = pull_consumers.elements();
> e.hasMoreElements() && !found_event;)
> {
> ProxyPullConsumerImpl lieferant = (ProxyPullConsumerImpl)e.nextElement();
> /*******************************/
> // Modified by Oscar Saavedra ; 7/Oct/99.
> // try_pull suppliers might have disconnected, so I catch that exception.
> try
> {
> event = lieferant.internal_try_pull( found_event_ref );
> }
> catch ( Disconnected dis)
> {
> disconnect_pull_consumer( lieferant );
> continue;
> }
> found_event = found_event_ref.value;
> if (found_event)
> {
> // und fuer (andere!) Konsumenten (also: ProxyPullSuppliers) in den Puffer \
> schreiben. tmp = (Vector)pull_suppliers.clone();
> tmp.removeElement( p );
> EventListElement x = new EventListElement( event, tmp );
> pending_events.addElement( x );
> notify();
> break;
> }
> }
> }    // endif
> 
> has_event.value = found_event;
> return (event);
> }
> 
> }
> 
> ------------------------------------------------------------------------
> package jacorb.events;
> 
> /*
> *        JacORB - a free Java ORB
> *
> *   Copyright (C) 1997-98  Gerald Brose.
> *
> *   This library is free software; you can redistribute it and/or
> *   modify it under the terms of the GNU Library General Public
> *   License as published by the Free Software Foundation; either
> *   version 2 of the License, or (at your option) any later version.
> *
> *   This library is distributed in the hope that it will be useful,
> *   but WITHOUT ANY WARRANTY; without even the implied warranty of
> *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> *   Library General Public License for more details.
> *
> *   You should have received a copy of the GNU Library General Public
> *   License along with this library; if not, write to the Free
> *   Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
> */
> 
> // Joerg v. Frantzius, Rainer Lischetzki, Gerald Brose 1997
> 
> import jacorb.orb.*;
> 
> public class ProxyPullConsumerImpl
> extends org.omg.CosEventChannelAdmin.ProxyPullConsumerPOA
> {
> private EventChannelImpl myEventChannel;
> private org.omg.CosEventComm.PullSupplier myPullSupplier;
> 
> protected ProxyPullConsumerImpl ( EventChannelImpl ec )
> {
> myEventChannel = ec;
> _orb( org.omg.CORBA.ORB.init());
> }
> 
> //  ProxyPullConsumer Interface:
> 
> public void connect_pull_supplier ( org.omg.CosEventComm.PullSupplier pull_supplier \
> ) {
> myPullSupplier = pull_supplier;
> synchronized( myEventChannel )
> {
> myEventChannel.notify();
> }
> }
> 
> //  PullConsumer Interface:
> 
> public void disconnect_pull_consumer()
> {
> myEventChannel.disconnect_pull_consumer ( this );
> }
> 
> // Methods called by the EventChannel:
> 
> protected org.omg.CORBA.Any internal_try_pull ( org.omg.CORBA.BooleanHolder \
> has_event ) throws org.omg.CosEventComm.Disconnected
> {
> try
> {
> return myPullSupplier.try_pull( has_event );
> }
> catch( Exception e)
> {
> throw new org.omg.CosEventComm.Disconnected();
> }
> }
> 
> /***********************/
> // Modified by Oscar Saavedra : 7/Oct/99.
> // This is the new method I have implemented and added.
> protected org.omg.CORBA.Any internal_pull (  )
> throws org.omg.CosEventComm.Disconnected
> {
> try
> {
> return myPullSupplier.pull( );
> }
> catch( Exception e)
> {
> throw new org.omg.CosEventComm.Disconnected();
> }
> }
> }

--
Gerald Brose,                       Mail:       brose@inf.fu-berlin.de
FU Berlin        (for PGP key see:) http://www.inf.fu-berlin.de/~brose
Institut f. Informatik              Ph-one:        (++49-30) 838-75112
Berlin, Germany                     Ph-ax:         (++49-30) 838-75109


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic