[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    svn commit: r1461839 [1/2] - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/ap
From:       chirino () apache ! org
Date:       2013-03-27 20:20:32
Message-ID: 20130327202032.C4BBE23888CD () eris ! apache ! org
[Download RAW message or body]

Author: chirino
Date: Wed Mar 27 20:20:31 2013
New Revision: 1461839

URL: http://svn.apache.org/r1461839
Log:
Starting to port the MQTT module to be a pure java impl, just to see how much effort \
that would take.

Added:
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.java
  activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolCodecFactory.java
  activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java
  activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java
  activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala
  activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.java
                
      - copied, changed from r1459409, \
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala
 Removed:
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.scala
  activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala
 Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
  activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
  activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
  activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
  activemq/activemq-apollo/trunk/apollo-scala/pom.xml
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala


Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
                
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/mai \
n/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1461839&r1=1461838&r2=1461839&view=diff
 ==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala \
                (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala \
Wed Mar 27 20:20:31 2013 @@ -40,6 +40,12 @@ abstract class Connection() extends Base
   private var _dispatch_queue = createQueue()
   def dispatch_queue = _dispatch_queue
 
+  def _set_dispatch_queue(next_queue:DispatchQueue, on_complete:Task) {
+    set_dispatch_queue(next_queue) {
+      on_complete.run()
+    }
+  }
+
   def set_dispatch_queue(next_queue:DispatchQueue)(on_complete: =>Unit) {
     _dispatch_queue {
       if(transport!=null) {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
                
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/mai \
n/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1461839&r1=1461838&r2=1461839&view=diff
 ==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala \
                (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala \
Wed Mar 27 20:20:31 2013 @@ -74,6 +74,8 @@ trait DeliveryConsumer extends Retained 
   def is_persistent:Boolean
 }
 
+abstract class AbstractRetainedDeliveryConsumer extends BaseRetained with \
DeliveryConsumer +
 class DeliveryConsumerFilter(val next:DeliveryConsumer) extends DeliveryConsumer {
   override def browser: Boolean = next.browser
   override def close_on_drain: Boolean = next.close_on_drain

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
                
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/mai \
n/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1461839&r1=1461838&r2=1461839&view=diff
 ==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala \
                (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala \
Wed Mar 27 20:20:31 2013 @@ -331,13 +331,22 @@ trait SessionSink[T] extends Sink[T] {
 }
 
 trait SessionSinkFilter[T] extends SessionSink[T] with SinkFilter[T] {
-  def downstream:SessionSink[T]
+  def downstream: SessionSink[T]
   def enqueue_item_counter = downstream.enqueue_item_counter
   def enqueue_size_counter = downstream.enqueue_size_counter
   def enqueue_ts = downstream.enqueue_ts
   def remaining_capacity = downstream.remaining_capacity
 }
 
+abstract class AbstractSessionSinkFilter[T] extends SessionSink[T] with \
SinkFilter[T] { +  def downstream:Sink[T] = downstream_session_sink
+  def downstream_session_sink: SessionSink[T]
+  def enqueue_item_counter = downstream_session_sink.enqueue_item_counter
+  def enqueue_size_counter = downstream_session_sink.enqueue_size_counter
+  def enqueue_ts = downstream_session_sink.enqueue_ts
+  def remaining_capacity = downstream_session_sink.remaining_capacity
+}
+
 /**
  *  <p>
  * A SinkMux multiplexes access to a target sink so that multiple

Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.java
                
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.java?rev=1461839&view=auto
 ==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.java \
                (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.java \
Wed Mar 27 20:20:31 2013 @@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt;
+
+import org.apache.activemq.apollo.broker.DestinationParser;
+import org.apache.activemq.apollo.broker.protocol.Protocol;
+import org.apache.activemq.apollo.broker.protocol.ProtocolHandler;
+import org.fusesource.hawtbuf.AsciiBuffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MqttProtocol extends MqttProtocolCodecFactory implements Protocol {
+
+    static final DestinationParser destination_parser = new DestinationParser();
+    static final AsciiBuffer PROTOCOL_ID = new AsciiBuffer(id);
+    static {
+        destination_parser.queue_prefix_$eq(null);
+        destination_parser.topic_prefix_$eq(null);
+        destination_parser.path_separator_$eq("/");
+        destination_parser.any_child_wildcard_$eq("+");
+        destination_parser.any_descendant_wildcard_$eq("#");
+        destination_parser.dsub_prefix_$eq(null);
+        destination_parser.temp_queue_prefix_$eq(null);
+        destination_parser.temp_topic_prefix_$eq(null);
+        destination_parser.destination_separator_$eq(null);
+        destination_parser.regex_wildcard_end_$eq(null);
+        destination_parser.regex_wildcard_end_$eq(null);
+        destination_parser.part_pattern_$eq(null);
+    }
+
+
+    @Override
+    public ProtocolHandler createProtocolHandler() {
+        return new MqttProtocolHandler();
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolCodecFactory.java
                
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/ \
scala/org/apache/activemq/apollo/mqtt/MqttProtocolCodecFactory.java?rev=1461839&view=auto
 ==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolCodecFactory.java \
                (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolCodecFactory.java \
Wed Mar 27 20:20:31 2013 @@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt;
+
+import org.apache.activemq.apollo.broker.Broker$;
+import org.apache.activemq.apollo.broker.Connector;
+import org.apache.activemq.apollo.broker.protocol.ProtocolCodecFactory;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.transport.ProtocolCodec;
+import org.fusesource.mqtt.codec.MQTTProtocolCodec;
+
+/**
+ * Creates MqttCodec objects that encode/decode the
+ * <a href="http://activemq.apache.org/mqtt/">Mqtt</a> protocol.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MqttProtocolCodecFactory implements ProtocolCodecFactory.Provider {
+
+    //
+    // An MQTT CONNECT message has between 10-13 bytes:
+    // Message Type     : 0x10 @ [0]
+    // Remaining Length : Byte{1-4} @ [1]
+    // Protocol Name    : 0x00 0x06 'M' 'Q' 'I' 's' 'd' 'p' @ [2|3|4|5]
+    //
+    static final String id = "mqtt";
+    static final Buffer HEAD_MAGIC = new Buffer(new byte []{ 0x10 });
+    static final Buffer TAIL_MAGIC = new Buffer(new byte []{ 0x00, 0x06, 'M', 'Q', \
'I', 's', 'd', 'p'}); +
+    @Override
+    public String id() {
+        return id;
+    }
+
+    @Override
+    public ProtocolCodec createProtocolCodec(Connector connector) {
+        MQTTProtocolCodec rc = new MQTTProtocolCodec();
+        rc.setBufferPools(Broker$.MODULE$.buffer_pools());
+        return rc;
+    }
+
+    @Override
+    public boolean isIdentifiable() {
+        return true;
+    }
+
+    @Override
+    public int maxIdentificaionLength() {
+        return 13;
+    }
+
+    @Override
+    public boolean matchesIdentification(Buffer header) {
+        if (header.length < 10) {
+          return false;
+        } else {
+          return header.startsWith(HEAD_MAGIC) && header.indexOf(TAIL_MAGIC, 2) < 6;
+        }
+    }
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
                
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/ \
scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1461839&r1=1461838&r2=1461839&view=diff
 ==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala \
                (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala \
Wed Mar 27 20:20:31 2013 @@ -46,12 +46,17 @@ import scala.Some
 import org.apache.activemq.apollo.mqtt.MqttSessionManager.HostState
 import org.apache.activemq.apollo.broker.SubscriptionAddress
 
+case class Request(id:Short, message:MessageSupport.Message, \
ack:UnitFn1[DeliveryResult]) { +  val frame = if(message==null) null else \
message.encode() +  var delivered = false
+}
+
+object SessionDeliverySizer extends Sizer[(Session[Delivery], Delivery)] {
+  def size(value: (Session[Delivery], Delivery)) = Delivery.size(value._2)
+}
+
 object MqttProtocolHandler extends Log {
   
-  case class Request(id:Short, message:MessageSupport.Message, \
                ack:(DeliveryResult)=>Unit) {
-    val frame = if(message==null) null else message.encode()
-    var delivered = false
-  }
 
   def received[T](value:T):T = {
     trace("received: %s", value)
@@ -60,9 +65,6 @@ object MqttProtocolHandler extends Log {
 
   val WAITING_ON_CLIENT_REQUEST = ()=> "client request"
 
-  object SessionDeliverySizer extends Sizer[(Session[Delivery], Delivery)] {
-    def size(value: (Session[Delivery], Delivery)) = Delivery.size(value._2)
-  }
 }
 
 /**
@@ -202,6 +204,7 @@ class MqttProtocolHandler extends Protoc
   /////////////////////////////////////////////////////////////////////
 
   var status = WAITING_ON_CLIENT_REQUEST
+  def _suspend_read(reason: String) = suspend_read(reason)
   def suspend_read(reason: => String) = {
     status = reason _
     connection.transport.suspendRead
@@ -414,15 +417,15 @@ class MqttProtocolHandler extends Protoc
   // Other msic bits.
   //
   /////////////////////////////////////////////////////////////////////
-  var messages_sent = 0L
-  var messages_received = 0L
+  val messages_sent = new LongCounter()
+  var messages_received = new LongCounter()
   var subscription_count = 0
 
   override def create_connection_status = {
     var rc = new MqttConnectionStatusDTO
     rc.protocol_version = "3.1"
-    rc.messages_sent = messages_sent
-    rc.messages_received = messages_received
+    rc.messages_sent = messages_sent.get()
+    rc.messages_received = messages_received.get
     rc.subscription_count = subscription_count
     rc.waiting_on = status()
     rc
@@ -446,17 +449,17 @@ object MqttSessionManager {
 
   class SessionState {
     var durable_sub:SubscriptionAddress = _
-    val subscriptions = HashMap[UTF8Buffer, (Topic, BindAddress)]()
+    val subscriptions = new java.util.HashMap[UTF8Buffer, (Topic, BindAddress)]()
     val received_message_ids: HashSet[Short] = new HashSet[Short]
 
     trait StorageStrategy {
-      def update(cb: =>Unit)
-      def destroy(cb: =>Unit)
+      def update(cb: Task)
+      def destroy(cb: Task)
       def create(store:Store, client_id:UTF8Buffer)
     }
     case class NoopStrategy() extends StorageStrategy {
-      def update(cb: =>Unit) = { cb }
-      def destroy(cb: =>Unit) { cb }
+      def update(cb: Task) = { cb.run() }
+      def destroy(cb: Task) { cb.run() }
       def create(store:Store, client_id:UTF8Buffer) = {
         if(store!=null)
           strategy = StoreStrategy(store, client_id)
@@ -465,12 +468,14 @@ object MqttSessionManager {
 
     case class StoreStrategy(store:Store, client_id:UTF8Buffer) extends \
StorageStrategy {  val session_key = new UTF8Buffer("mqtt:"+client_id)
-      def update(cb: =>Unit) = {
+      def update(cb: Task) = {
         val uow = store.create_uow
         val session_pb = new SessionPB.Bean
         session_pb.setClientId(client_id)
         received_message_ids.foreach(session_pb.addReceivedMessageIds(_))
-        subscriptions.values.foreach { case (topic, address) =>
+
+        import collection.JavaConversions._
+        for( (topic, address)  <- subscriptions.values) {
           val topic_pb = new TopicPB.Bean
           topic_pb.setName(topic.name())
           topic_pb.setQos(topic.qos().ordinal())
@@ -482,20 +487,20 @@ object MqttSessionManager {
         val current = getCurrentQueue
         uow.on_complete {
           current {
-            cb
+            cb.run()
           }
         }
         uow.release
       }
 
-      def destroy(cb: =>Unit) {
+      def destroy(cb: Task) {
         val uow = store.create_uow
         uow.put(session_key, null)
         val current = getCurrentQueue
         uow.on_complete {
           current {
             strategy = NoopStrategy()
-            cb
+            cb.run()
           }
         }
         uow.release
@@ -564,7 +569,7 @@ object MqttSessionManager {
           } else {
             host_state.session_states.getOrElseUpdate(client_id, new SessionState())
           }
-          val assignment = MqttSession(host_state, client_id, state)
+          val assignment = new MqttSession(host_state, client_id, state)
           assignment.connect(handler)
           host_state.sessions.put(client_id, assignment)
       }
@@ -583,724 +588,724 @@ object MqttSessionManager {
   }
 }
 
-/**
- * An MqttSession can be switch from one connection/protocol handler to another,
- * but it will only be associated with one at a time. An MqttSession tracks
- * the state of the communication with a client.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-case class MqttSession(host_state:HostState, client_id:UTF8Buffer, \
                session_state:SessionState) {
-  
-  import MqttProtocolHandler._
-
-  def host = host_state.host
-
-  val queue = createQueue("mqtt: "+client_id)
-  var manager_disconnected = false
-
-  var handler:Option[MqttProtocolHandler] = None
-  var security_context:SecurityContext = _
-  var clean_session = false
-  var connect_message:CONNECT = _
-  var destination_parser = MqttProtocol.destination_parser
-
-  def connect(next:MqttProtocolHandler):Unit = queue {
-    if(manager_disconnected) {
-      // we are not the assignment anymore.. go to the session manager
-      // again to setup a new session.
-      MqttSessionManager.attach(host, client_id, next)
-    } else {
-      
-      // so that we don't switch again until this current switch completes
-      queue.suspend()
-      if( handler != None ) {
-        detach
-        handler = None
-      }
-      queue {
-        handler=Some(next)
-        attach
-      }
-      
-      // switch the connection to the session queue..
-      next.connection.set_dispatch_queue(queue) {
-        queue.resume()
-      }
-    }
-  }
-
-  def disconnect(prev:MqttProtocolHandler) = queue {
-    if( handler==Some(prev) ) {
-      MqttSessionManager.remove(host_state, client_id)
-      manager_disconnected = true
-      detach
-      handler = None
-    }
-  }
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Bits that deal with connections attaching/detaching from the session
-  //
-  /////////////////////////////////////////////////////////////////////
-  def attach = {
-    queue.assertExecuting()
-    val h = handler.get
-    clean_session = h.connect_message.cleanSession()
-    security_context = h.security_context
-    h.command_handler = on_transport_command _
-    destination_parser = h.destination_parser
-    mqtt_consumer.consumer_sink.downstream = Some(h.sink_manager.open)
-
-    def ack_connect = {
-      queue.assertExecuting()
-      connect_message = h.connect_message
-      val connack = new CONNACK
-      connack.code(CONNECTION_ACCEPTED)
-      send(connack)
-    }
-
-    if( !clean_session ) {
-      // Setup the previous subscriptions..
-      session_state.strategy.create(host.store, client_id)
-      if( !session_state.subscriptions.isEmpty ) {
-        h.suspend_read("subscribing")
-        subscribe(session_state.subscriptions.map(_._2._1)) {
-          h.resume_read()
-          h.queue {
-            ack_connect
-          }
-        }
-      } else {
-        ack_connect
-      }
-    } else {
-      // do we need to clear the received ids?
-      // durable_session_state.received_message_ids.clear()
-      session_state.subscriptions.clear()
-      if( session_state.durable_sub !=null ) {
-        var addresses = Array(session_state.durable_sub)
-        session_state.durable_sub = null
-        host.dispatch_queue {
-          host.router.delete(addresses, security_context)
-        }
-      }
-      session_state.strategy.destroy {
-        ack_connect
-      }
-    }
-
-  }
-
-  def detach:Unit = {
-    queue.assertExecuting()
-
-    if(!producerRoutes.isEmpty) {
-      import collection.JavaConversions._
-      val routes = producerRoutes.values.toSeq.toArray
-      host.dispatch_queue {
-        routes.foreach { route=>
-          host.router.disconnect(Array(route.address), route)
-        }
-      }
-      producerRoutes.clear
-    }
-
-    if( clean_session ) {
-      if(!mqtt_consumer.addresses.isEmpty) {
-        var addresses = mqtt_consumer.addresses.keySet.toArray
-        host.dispatch_queue {
-          host.router.unbind(addresses, mqtt_consumer, false , security_context)
-        }
-        mqtt_consumer.addresses.clear()
-      }
-      session_state.subscriptions.clear()
-    } else {
-      if(session_state.durable_sub!=null) {
-        var addresses = Array(session_state.durable_sub)
-        host.dispatch_queue {
-          host.router.unbind(addresses, mqtt_consumer, false , security_context)
-        }
-        mqtt_consumer.addresses.clear()
-        session_state.durable_sub = null
-      }
-    }
-
-    in_flight_publishes.values.foreach { request =>
-      if( request.ack!=null ) {
-        if(request.delivered) {
-          request.ack(Delivered)
-        } else {
-          request.ack(Undelivered)
-        }
-      }
-    }
-    in_flight_publishes.clear()
-    
-    handler.get.sink_manager.close(mqtt_consumer.consumer_sink.downstream.get, \
                (request)=>{})
-    mqtt_consumer.consumer_sink.downstream = None
-
-    handler.get.on_transport_disconnected()
-  }
-
-  def decode_destination(value:UTF8Buffer):SimpleAddress = {
-    val rc = destination_parser.decode_single_destination(value.toString, (name)=>{
-      SimpleAddress("topic", destination_parser.decode_path(name))
-    })
-    if( rc==null ) {
-      handler.foreach(_.die("Invalid mqtt destination name: "+value))
-    }
-    rc
-  }
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Bits that deal with assigning message ids to QoS > 0 requests
-  // and tracking those requests so that they can get replayed on a
-  // reconnect.
-  //
-  /////////////////////////////////////////////////////////////////////
-
-  var in_flight_publishes = HashMap[Short, Request]()
-
-  def send(message: MessageSupport.Message): Unit = {
-    queue.assertExecuting()
-    handler.foreach(_.connection_sink.offer(Request(0, message, null)))
-  }
-
-  def publish_completed(id: Short): Unit = {
-    queue.assertExecuting()
-    in_flight_publishes.remove(id) match {
-      case Some(request) =>
-        if ( request.ack != null ) {
-          request.ack(Consumed)
-        }
-      case None =>
-        // It's possible that on a reconnect, we get an ACK
-        // in for message that was not dispatched yet. store
-        // a place holder so we ack it upon the dispatch 
-        // attempt.
-        in_flight_publishes.put(id, Request(id, null, null))
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Bits that deal with processing new messages from the client.
-  //
-  /////////////////////////////////////////////////////////////////////
-  def on_transport_command(command:AnyRef):Unit = command match {
-    case command:MQTTFrame=>
-      
-      command.messageType() match {
-
-        case PUBLISH.TYPE =>
-          on_mqtt_publish(received(new PUBLISH().decode(command)))
-
-        // This follows a Publish with QoS EXACTLY_ONCE
-        case PUBREL.TYPE =>
-          var ack = received(new PUBREL().decode(command))
-          // TODO: perhaps persist the processed list.. otherwise
-          // we can't filter out dups after a broker restart.
-          session_state.received_message_ids.remove(ack.messageId)
-          session_state.strategy.update {
-            send(new PUBCOMP().messageId(ack.messageId))
-          }
-
-        case SUBSCRIBE.TYPE =>
-          on_mqtt_subscribe(received(new SUBSCRIBE().decode(command)))
-
-        case UNSUBSCRIBE.TYPE =>
-          on_mqtt_unsubscribe(received(new UNSUBSCRIBE().decode(command)))
-
-        // AT_LEAST_ONCE ack flow for a client subscription
-        case PUBACK.TYPE =>
-          val ack = received(new PUBACK().decode(command))
-          publish_completed(ack.messageId)
-
-        // EXACTLY_ONCE ack flow for a client subscription
-        case PUBREC.TYPE =>
-          val ack = received(new PUBREC().decode(command))
-          send(new PUBREL().messageId(ack.messageId))
-
-        case PUBCOMP.TYPE =>
-          val ack: PUBCOMP = received(new PUBCOMP().decode(command))
-          publish_completed(ack.messageId)
-
-        case PINGREQ.TYPE =>
-          received(new PINGREQ().decode(command))
-          send(new PINGRESP())
-
-        case DISCONNECT.TYPE =>
-          received(new DISCONNECT())
-          MqttSessionManager.disconnect(host_state, client_id, handler.get)
-
-        case _ =>
-          handler.get.die("Invalid MQTT message type: "+command.messageType());
-      }
-    case "failure" =>
-      // Publish the client's will
-      publish_will {
-        // then disconnect him.
-        MqttSessionManager.disconnect(host_state, client_id, handler.get)
-      }
-
-    case _=>
-      handler.get.die("Internal Server Error: unexpected mqtt command: \
                "+command.getClass);
-  }
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Bits that deal with processing PUBLISH messages
-  //
-  /////////////////////////////////////////////////////////////////////
-  var producerRoutes = new LRUCache[UTF8Buffer, MqttProducerRoute](10) {
-    override def onCacheEviction(eldest: Entry[UTF8Buffer, MqttProducerRoute]) = {
-      host.dispatch_queue {
-        host.router.disconnect(Array(eldest.getValue.address), eldest.getValue)
-      }
-    }
-  }
-  case class MqttProducerRoute(address:SimpleAddress, handler:MqttProtocolHandler) \
                extends DeliveryProducerRoute(host.router) {
-    override def send_buffer_size = handler.codec.getReadBufferSize
-    override def connection = Some(handler.connection)
-    override def dispatch_queue = queue
-
-    var suspended = false
-
-    refiller = ^ {
-      if( suspended ) {
-        suspended = false
-        handler.resume_read
-      }
-    }
-
-  }
-
-  def on_mqtt_publish(publish:PUBLISH):Unit = {
-
-    if( (publish.qos eq EXACTLY_ONCE) && \
                session_state.received_message_ids.contains(publish.messageId)) {
-      val response = new PUBREC
-      response.messageId(publish.messageId)
-      send(response)
-      return
-    }
-
-    handler.get.messages_received += 1
-
-    queue.assertExecuting()
-    producerRoutes.get(publish.topicName()) match {
-      case null =>
-        // create the producer route...
-
-        val destination = decode_destination(publish.topicName())
-        val route = MqttProducerRoute(destination, handler.get)
-
-        // don't process commands until producer is connected...
-        route.handler.suspend_read("route publish lookup")
-        host.dispatch_queue {
-          host.router.connect(Array(destination), route, security_context)
-          queue {
-            // We don't care if we are not allowed to send..
-            if (!route.handler.connection.stopped) {
-              route.handler.resume_read
-              producerRoutes.put(publish.topicName(), route)
-              send_via_route(route, publish)
-            }
-          }
-        }
-
-      case route =>
-        // we can re-use the existing producer route
-        send_via_route(route, publish)
-    }
-  }
-
-  def send_via_route(route:MqttProducerRoute, publish:PUBLISH):Unit = {
-    queue.assertExecuting()
-
-    def at_least_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
-      val response = new PUBACK
-      response.messageId(publish.messageId)
-      send(response)
-    }
-
-    def exactly_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
-      queue.assertExecuting()
-      // TODO: perhaps persist the processed list..
-      session_state.received_message_ids.add(publish.messageId)
-      session_state.strategy.update {
-        val response = new PUBREC
-        response.messageId(publish.messageId)
-        send(response)
-      }
-    }
-
-    val ack = publish.qos match {
-      case AT_LEAST_ONCE => at_least_once_ack _
-      case EXACTLY_ONCE => exactly_once_ack _
-      case AT_MOST_ONCE => null
-    }
-
-    if( !route.targets.isEmpty ) {
-      val delivery = new Delivery
-      delivery.message = RawMessage(publish.payload)
-      delivery.persistent = publish.qos().ordinal() > 0
-      delivery.size = publish.payload.length
-      delivery.ack = ack
-      if( publish.retain() ) {
-        if( delivery.size == 0 ) {
-          delivery.retain = RetainRemove
-        } else {
-          delivery.retain = RetainSet
-        }
-      }
-
-      // routes can always accept at least 1 delivery...
-      assert( !route.full )
-      route.offer(delivery)
-      if( route.full ) {
-        // but once it gets full.. suspend to flow control the producer.
-        route.suspended = true
-        handler.get.suspend_read("blocked sending to: \
                "+route.overflowSessions.mkString(", "))
-      }
-
-    } else {
-      ack(null, null)
-    }
-  }
-
-  
-  //
-  def publish_will(complete_close: =>Unit) = {
-    if(connect_message!=null) {
-      if( connect_message.willTopic()==null ) {
-        complete_close
-      } else {
-  
-        val destination = decode_destination(connect_message.willTopic())
-        val prodcuer = new DeliveryProducerRoute(host.router) {
-          override def send_buffer_size = 1024*64
-          override def connection = handler.map(_.connection)
-          override def dispatch_queue = queue
-          refiller = NOOP
-        }
-  
-        host.dispatch_queue {
-          host.router.connect(Array(destination), prodcuer, security_context)
-          queue {
-            if(prodcuer.targets.isEmpty) {
-              complete_close
-            } else {
-              val delivery = new Delivery
-              delivery.message = RawMessage(connect_message.willMessage())
-              delivery.size = connect_message.willMessage().length
-              delivery.persistent = connect_message.willQos().ordinal() > 0
-              if( connect_message.willRetain() ) {
-                if( delivery.size == 0 ) {
-                  delivery.retain = RetainRemove
-                } else {
-                  delivery.retain = RetainSet
-                }
-              }
-  
-              delivery.ack = (x,y) => {
-                host.dispatch_queue {
-                  host.router.disconnect(Array(destination), prodcuer)
-                }
-                complete_close
-              }
-              handler.get.messages_received += 1
-              prodcuer.offer(delivery)
-            }
-          }
-        }
-      }
-    }
-  }
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Bits that deal with subscriptions
-  //
-  /////////////////////////////////////////////////////////////////////
-  
-  def on_mqtt_subscribe(sub:SUBSCRIBE):Unit = {
-    subscribe(sub.topics()) {
-      queue {
-        session_state.strategy.update {
-          val suback = new SUBACK
-          suback.messageId(sub.messageId())
-          suback.grantedQos(sub.topics().map(_.qos().ordinal().toByte))
-          send(suback)
-        }
-      }
-    }
-  }
-  
-  def subscribe(topics:Traversable[Topic])(on_subscribed: => Unit):Unit = {
-    var addresses:Array[_ <: BindAddress] = topics.toArray.map { topic =>
-      var address:BindAddress = decode_destination(topic.name)
-      session_state.subscriptions += topic.name -> (topic, address)
-      mqtt_consumer.addresses += address -> topic.qos
-      if(PathParser.containsWildCards(address.path)) {
-        mqtt_consumer.wildcards.put( address.path, topic.qos() )
-      }
-      address
-    }
-
-    handler.get.subscription_count = mqtt_consumer.addresses.size
-
-    addresses = if( clean_session ) {
-      addresses
-    } else {
-      session_state.durable_sub = SubscriptionAddress(Path(client_id.toString), \
                null, mqtt_consumer.addresses.keySet.toArray)
-      Array(session_state.durable_sub)
-    }      
-
-    host.dispatch_queue {
-      addresses.foreach { address=>
-        host.router.bind(Array[BindAddress](address), mqtt_consumer, \
                security_context) { result =>
-        // MQTT ignores subscribe failures.
-        }
-      }
-      on_subscribed
-    }
-
-  }
-
-  def on_mqtt_unsubscribe(unsubscribe:UNSUBSCRIBE):Unit = {
-
-    val addresses:Array[_ <: BindAddress] = unsubscribe.topics.flatMap { topic =>
-      session_state.subscriptions.remove(topic).map { case (topic, address)=>
-        mqtt_consumer.addresses.remove(address)
-        if(PathParser.containsWildCards(address.path)) {
-          mqtt_consumer.wildcards.remove(address.path, topic.qos)
-        }
-        address
-      }
-    }
-
-    handler.get.subscription_count = mqtt_consumer.addresses.size
-
-    if(!clean_session) {
-      session_state.durable_sub = SubscriptionAddress(Path(client_id.toString), \
                null, mqtt_consumer.addresses.keySet.toArray)
-    }
-    
-    host.dispatch_queue {
-      if(clean_session) {
-        host.router.unbind(addresses, mqtt_consumer, false, security_context)
-      } else {
-        if( mqtt_consumer.addresses.isEmpty ) {
-          host.router.unbind(Array(session_state.durable_sub), mqtt_consumer, true, \
                security_context)
-          session_state.durable_sub = null
-        } else {
-          host.router.bind(Array(session_state.durable_sub), mqtt_consumer, \
                security_context) { result =>
-          }
-        }
-      }
-      queue {
-        session_state.strategy.update {
-          val ack = new UNSUBACK
-          ack.messageId(unsubscribe.messageId())
-          send(ack)
-        }
-      }
-    }
-
-  }
-
-  var publish_body = false
-
-  lazy val mqtt_consumer = new MqttConsumer
-  class MqttConsumer extends BaseRetained with DeliveryConsumer {
-    
-    override def toString = "mqtt client:"+client_id+" remote address: \
                "+security_context.remote_address
-
-    val addresses = HashMap[BindAddress, QoS]()
-    val wildcards = new PathMap[QoS]()
-
-    val credit_window_source = createSource(new EventAggregator[(Int, Int), (Int, \
                Int)] {
-      def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
-        if( previous == null ) {
-          event
-        } else {
-          (previous._1+event._1, previous._2+event._2)
-        }
-      }
-      def mergeEvents(previous:(Int, Int), events:(Int, Int)) = mergeEvent(previous, \
                events)
-    }, dispatch_queue)
-
-    credit_window_source.setEventHandler(^{
-      val data = credit_window_source.getData
-      credit_window_filter.credit(data._1, data._2)
-    });
-    credit_window_source.resume
-
-    val consumer_sink = new MutableSink[Request]()
-    consumer_sink.downstream = None
-
-    var next_seq_id = 1L
-    def get_next_seq_id = {
-      val rc = next_seq_id
-      next_seq_id += 1
-      rc
-    }
-
-    def to_message_id(value:Long):Short = (
-        0x8000 | // MQTT message ids cannot be zero, so we always set the highest \
                bit.
-        (value & 0x7FFF) // the lower 15 bits come for the original seq id.
-      ).toShort
-
-    val credit_window_filter = new CreditWindowFilter[(Session[Delivery], \
                Delivery)](consumer_sink.flatMap{ event =>
-      queue.assertExecuting()
-      val (session, delivery) = event
-
-      session_manager.delivered(session, delivery.size)
-      
-      // Look up which QoS we need to send this message with..
-      var topic = delivery.sender.head.simple
-      import collection.JavaConversions._
-      addresses.get(topic).orElse(wildcards.get(topic.path).headOption) match {
-          
-        case None =>
-          // draining messages after an un-subscribe
-          acked(delivery, Consumed)
-          None
-          
-        case Some(qos) =>
-
-          // Convert the Delivery into a Request
-          var publish = new PUBLISH
-          publish.topicName(new \
                UTF8Buffer(destination_parser.encode_destination(Array(delivery.sender.head))))
                
-          if( delivery.redeliveries > 0) {
-            publish.dup(true)
-          }
-
-          if( delivery.message.codec eq RawMessageCodec ) {
-            publish.payload(delivery.message.asInstanceOf[RawMessage].payload)
-          } else {
-            if( publish_body ) {
-              publish.payload(delivery.message.getBodyAs(classOf[Buffer]))
-            } else {
-              publish.payload(delivery.message.encoded)
-            }
-          }
-
-          handler.get.messages_sent += 1
-
-          if (delivery.ack!=null && (qos ne AT_MOST_ONCE)) {
-            publish.qos(qos)
-            val id = to_message_id(if(clean_session) {
-              get_next_seq_id // generate our own seq id.
-            } else {
-              delivery.seq // use the durable sub's seq id..
-            })
-
-            publish.messageId(id)
-            val request = Request(id, publish, (result)=>{acked(delivery, result)})
-            in_flight_publishes.put(id, request) match {
-              case Some(r) =>
-                // A reconnecting client could have acked before
-                // we get dispatched by the durable sub.
-                if( r.message == null ) {
-                  in_flight_publishes.remove(id)
-                  acked(delivery, Consumed)
-                } else {
-                  // Looks we sent out a msg with that id.  This could only
-                  // happen once we send out 0x7FFF message and the first
-                  // one has not been acked.
-                  handler.foreach(_.async_die("Client not acking regularly.", null))
-                }
-              case None =>
-            }
-            
-            Some(request)
-
-          } else {
-            // This callback gets executed once the message
-            // sent to the transport.
-            publish.qos(AT_MOST_ONCE)
-            Some(Request(0, publish, (result)=>{ acked(delivery, result) }))
-          }
-      }
-      
-    }, SessionDeliverySizer)
-
-    def acked(delivery:Delivery, result:DeliveryResult) = {
-      queue.assertExecuting()
-      credit_window_source.merge((delivery.size, 1))
-      if( delivery.ack!=null ) {
-        delivery.ack(result, null)
-      }
-    }
-    
-    credit_window_filter.credit(handler.get.codec.getWriteBufferSize*2, 1)
-
-    val session_manager:SessionSinkMux[Delivery] = new \
SessionSinkMux[Delivery](credit_window_filter, queue, Delivery, Integer.MAX_VALUE/2, \
                receive_buffer_size) {
-      override def time_stamp = host.broker.now
-    }
-
-    override def dispose() = queue {
-      super.dispose()
-    }
-
-    def dispatch_queue = queue
-    override def connection = handler.map(_.connection)
-    override def receive_buffer_size = 1024*64; // handler.codec.getWriteBufferSize
-    def is_persistent = false
-    def matches(delivery:Delivery):Boolean = true
-
-    //
-    // Each destination we subscribe to will establish a session with us.
-    //
-    class MqttConsumerSession(val producer:DeliveryProducer) extends DeliverySession \
                with SessionSinkFilter[Delivery] {
-      producer.dispatch_queue.assertExecuting()
-      retain
-
-      val downstream = session_manager.open(producer.dispatch_queue)
-
-      override def toString = "connection to \
                "+handler.map(_.connection.transport.getRemoteAddress).getOrElse("unconnected")
                
-
-      def consumer = mqtt_consumer
-      var closed = false
-
-      def close = {
-        assert(producer.dispatch_queue.isExecuting)
-        if( !closed ) {
-          closed = true
-          dispose
-        }
-      }
-
-      def dispose = {
-        session_manager.close(downstream, (delivery)=>{
-          // We have been closed so we have to nak any deliveries.
-          if( delivery.ack!=null ) {
-            delivery.ack(Undelivered, delivery.uow)
-          }
-        })
-        release
-      }
-
-      // Delegate all the flow control stuff to the session
-      override def full = {
-        val rc = super.full
-        rc
-      }
-
-      def offer(delivery:Delivery) = {
-        if( full ) {
-          false
-        } else {
-          delivery.message.retain()
-          val rc = downstream.offer(delivery)
-          assert(rc, "offer should be accepted since it was not full")
-          true
-        }
-      }
-
-    }
-    def connect(p:DeliveryProducer) = new MqttConsumerSession(p)
-  }
-
-}
+///**
+// * An MqttSession can be switch from one connection/protocol handler to another,
+// * but it will only be associated with one at a time. An MqttSession tracks
+// * the state of the communication with a client.
+// *
+// * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+// */
+//case class MqttSession(host_state:HostState, client_id:UTF8Buffer, \
session_state:SessionState) { +//
+//  import MqttProtocolHandler._
+//
+//  def host = host_state.host
+//
+//  val queue = createQueue("mqtt: "+client_id)
+//  var manager_disconnected = false
+//
+//  var handler:Option[MqttProtocolHandler] = None
+//  var security_context:SecurityContext = _
+//  var clean_session = false
+//  var connect_message:CONNECT = _
+//  var destination_parser = MqttProtocol.destination_parser
+//
+//  def connect(next:MqttProtocolHandler):Unit = queue {
+//    if(manager_disconnected) {
+//      // we are not the assignment anymore.. go to the session manager
+//      // again to setup a new session.
+//      MqttSessionManager.attach(host, client_id, next)
+//    } else {
+//
+//      // so that we don't switch again until this current switch completes
+//      queue.suspend()
+//      if( handler != None ) {
+//        detach
+//        handler = None
+//      }
+//      queue {
+//        handler=Some(next)
+//        attach
+//      }
+//
+//      // switch the connection to the session queue..
+//      next.connection.set_dispatch_queue(queue) {
+//        queue.resume()
+//      }
+//    }
+//  }
+//
+//  def disconnect(prev:MqttProtocolHandler) = queue {
+//    if( handler==Some(prev) ) {
+//      MqttSessionManager.remove(host_state, client_id)
+//      manager_disconnected = true
+//      detach
+//      handler = None
+//    }
+//  }
+//  /////////////////////////////////////////////////////////////////////
+//  //
+//  // Bits that deal with connections attaching/detaching from the session
+//  //
+//  /////////////////////////////////////////////////////////////////////
+//  def attach = {
+//    queue.assertExecuting()
+//    val h = handler.get
+//    clean_session = h.connect_message.cleanSession()
+//    security_context = h.security_context
+//    h.command_handler = on_transport_command _
+//    destination_parser = h.destination_parser
+//    mqtt_consumer.consumer_sink.downstream = Some(h.sink_manager.open)
+//
+//    def ack_connect = {
+//      queue.assertExecuting()
+//      connect_message = h.connect_message
+//      val connack = new CONNACK
+//      connack.code(CONNECTION_ACCEPTED)
+//      send(connack)
+//    }
+//
+//    if( !clean_session ) {
+//      // Setup the previous subscriptions..
+//      session_state.strategy.create(host.store, client_id)
+//      if( !session_state.subscriptions.isEmpty ) {
+//        h.suspend_read("subscribing")
+//        subscribe(session_state.subscriptions.map(_._2._1)) {
+//          h.resume_read()
+//          h.queue {
+//            ack_connect
+//          }
+//        }
+//      } else {
+//        ack_connect
+//      }
+//    } else {
+//      // do we need to clear the received ids?
+//      // durable_session_state.received_message_ids.clear()
+//      session_state.subscriptions.clear()
+//      if( session_state.durable_sub !=null ) {
+//        var addresses = Array(session_state.durable_sub)
+//        session_state.durable_sub = null
+//        host.dispatch_queue {
+//          host.router.delete(addresses, security_context)
+//        }
+//      }
+//      session_state.strategy.destroy {
+//        ack_connect
+//      }
+//    }
+//
+//  }
+//
+//  def detach:Unit = {
+//    queue.assertExecuting()
+//
+//    if(!producerRoutes.isEmpty) {
+//      import collection.JavaConversions._
+//      val routes = producerRoutes.values.toSeq.toArray
+//      host.dispatch_queue {
+//        routes.foreach { route=>
+//          host.router.disconnect(Array(route.address), route)
+//        }
+//      }
+//      producerRoutes.clear
+//    }
+//
+//    if( clean_session ) {
+//      if(!mqtt_consumer.addresses.isEmpty) {
+//        var addresses = mqtt_consumer.addresses.keySet.toArray
+//        host.dispatch_queue {
+//          host.router.unbind(addresses, mqtt_consumer, false , security_context)
+//        }
+//        mqtt_consumer.addresses.clear()
+//      }
+//      session_state.subscriptions.clear()
+//    } else {
+//      if(session_state.durable_sub!=null) {
+//        var addresses = Array(session_state.durable_sub)
+//        host.dispatch_queue {
+//          host.router.unbind(addresses, mqtt_consumer, false , security_context)
+//        }
+//        mqtt_consumer.addresses.clear()
+//        session_state.durable_sub = null
+//      }
+//    }
+//
+//    in_flight_publishes.values.foreach { request =>
+//      if( request.ack!=null ) {
+//        if(request.delivered) {
+//          request.ack(Delivered)
+//        } else {
+//          request.ack(Undelivered)
+//        }
+//      }
+//    }
+//    in_flight_publishes.clear()
+//
+//    handler.get.sink_manager.close(mqtt_consumer.consumer_sink.downstream.get, \
(request)=>{}) +//    mqtt_consumer.consumer_sink.downstream = None
+//
+//    handler.get.on_transport_disconnected()
+//  }
+//
+//  def decode_destination(value:UTF8Buffer):SimpleAddress = {
+//    val rc = destination_parser.decode_single_destination(value.toString, \
(name)=>{ +//      SimpleAddress("topic", destination_parser.decode_path(name))
+//    })
+//    if( rc==null ) {
+//      handler.foreach(_.die("Invalid mqtt destination name: "+value))
+//    }
+//    rc
+//  }
+//
+//  /////////////////////////////////////////////////////////////////////
+//  //
+//  // Bits that deal with assigning message ids to QoS > 0 requests
+//  // and tracking those requests so that they can get replayed on a
+//  // reconnect.
+//  //
+//  /////////////////////////////////////////////////////////////////////
+//
+//  var in_flight_publishes = HashMap[Short, Request]()
+//
+//  def send(message: MessageSupport.Message): Unit = {
+//    queue.assertExecuting()
+//    handler.foreach(_.connection_sink.offer(Request(0, message, null)))
+//  }
+//
+//  def publish_completed(id: Short): Unit = {
+//    queue.assertExecuting()
+//    in_flight_publishes.remove(id) match {
+//      case Some(request) =>
+//        if ( request.ack != null ) {
+//          request.ack(Consumed)
+//        }
+//      case None =>
+//        // It's possible that on a reconnect, we get an ACK
+//        // in for message that was not dispatched yet. store
+//        // a place holder so we ack it upon the dispatch
+//        // attempt.
+//        in_flight_publishes.put(id, Request(id, null, null))
+//    }
+//  }
+//
+//  /////////////////////////////////////////////////////////////////////
+//  //
+//  // Bits that deal with processing new messages from the client.
+//  //
+//  /////////////////////////////////////////////////////////////////////
+//  def on_transport_command(command:AnyRef):Unit = command match {
+//    case command:MQTTFrame=>
+//
+//      command.messageType() match {
+//
+//        case PUBLISH.TYPE =>
+//          on_mqtt_publish(received(new PUBLISH().decode(command)))
+//
+//        // This follows a Publish with QoS EXACTLY_ONCE
+//        case PUBREL.TYPE =>
+//          var ack = received(new PUBREL().decode(command))
+//          // TODO: perhaps persist the processed list.. otherwise
+//          // we can't filter out dups after a broker restart.
+//          session_state.received_message_ids.remove(ack.messageId)
+//          session_state.strategy.update {
+//            send(new PUBCOMP().messageId(ack.messageId))
+//          }
+//
+//        case SUBSCRIBE.TYPE =>
+//          on_mqtt_subscribe(received(new SUBSCRIBE().decode(command)))
+//
+//        case UNSUBSCRIBE.TYPE =>
+//          on_mqtt_unsubscribe(received(new UNSUBSCRIBE().decode(command)))
+//
+//        // AT_LEAST_ONCE ack flow for a client subscription
+//        case PUBACK.TYPE =>
+//          val ack = received(new PUBACK().decode(command))
+//          publish_completed(ack.messageId)
+//
+//        // EXACTLY_ONCE ack flow for a client subscription
+//        case PUBREC.TYPE =>
+//          val ack = received(new PUBREC().decode(command))
+//          send(new PUBREL().messageId(ack.messageId))
+//
+//        case PUBCOMP.TYPE =>
+//          val ack: PUBCOMP = received(new PUBCOMP().decode(command))
+//          publish_completed(ack.messageId)
+//
+//        case PINGREQ.TYPE =>
+//          received(new PINGREQ().decode(command))
+//          send(new PINGRESP())
+//
+//        case DISCONNECT.TYPE =>
+//          received(new DISCONNECT())
+//          MqttSessionManager.disconnect(host_state, client_id, handler.get)
+//
+//        case _ =>
+//          handler.get.die("Invalid MQTT message type: "+command.messageType());
+//      }
+//    case "failure" =>
+//      // Publish the client's will
+//      publish_will {
+//        // then disconnect him.
+//        MqttSessionManager.disconnect(host_state, client_id, handler.get)
+//      }
+//
+//    case _=>
+//      handler.get.die("Internal Server Error: unexpected mqtt command: \
"+command.getClass); +//  }
+//
+//  /////////////////////////////////////////////////////////////////////
+//  //
+//  // Bits that deal with processing PUBLISH messages
+//  //
+//  /////////////////////////////////////////////////////////////////////
+//  var producerRoutes = new LRUCache[UTF8Buffer, MqttProducerRoute](10) {
+//    override def onCacheEviction(eldest: Entry[UTF8Buffer, MqttProducerRoute]) = {
+//      host.dispatch_queue {
+//        host.router.disconnect(Array(eldest.getValue.address), eldest.getValue)
+//      }
+//    }
+//  }
+//  case class MqttProducerRoute(address:SimpleAddress, handler:MqttProtocolHandler) \
extends DeliveryProducerRoute(host.router) { +//    override def send_buffer_size = \
handler.codec.getReadBufferSize +//    override def connection = \
Some(handler.connection) +//    override def dispatch_queue = queue
+//
+//    var suspended = false
+//
+//    refiller = ^ {
+//      if( suspended ) {
+//        suspended = false
+//        handler.resume_read
+//      }
+//    }
+//
+//  }
+//
+//  def on_mqtt_publish(publish:PUBLISH):Unit = {
+//
+//    if( (publish.qos eq EXACTLY_ONCE) && \
session_state.received_message_ids.contains(publish.messageId)) { +//      val \
response = new PUBREC +//      response.messageId(publish.messageId)
+//      send(response)
+//      return
+//    }
+//
+//    handler.get.messages_received += 1
+//
+//    queue.assertExecuting()
+//    producerRoutes.get(publish.topicName()) match {
+//      case null =>
+//        // create the producer route...
+//
+//        val destination = decode_destination(publish.topicName())
+//        val route = MqttProducerRoute(destination, handler.get)
+//
+//        // don't process commands until producer is connected...
+//        route.handler.suspend_read("route publish lookup")
+//        host.dispatch_queue {
+//          host.router.connect(Array(destination), route, security_context)
+//          queue {
+//            // We don't care if we are not allowed to send..
+//            if (!route.handler.connection.stopped) {
+//              route.handler.resume_read
+//              producerRoutes.put(publish.topicName(), route)
+//              send_via_route(route, publish)
+//            }
+//          }
+//        }
+//
+//      case route =>
+//        // we can re-use the existing producer route
+//        send_via_route(route, publish)
+//    }
+//  }
+//
+//  def send_via_route(route:MqttProducerRoute, publish:PUBLISH):Unit = {
+//    queue.assertExecuting()
+//
+//    def at_least_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
+//      val response = new PUBACK
+//      response.messageId(publish.messageId)
+//      send(response)
+//    }
+//
+//    def exactly_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
+//      queue.assertExecuting()
+//      // TODO: perhaps persist the processed list..
+//      session_state.received_message_ids.add(publish.messageId)
+//      session_state.strategy.update {
+//        val response = new PUBREC
+//        response.messageId(publish.messageId)
+//        send(response)
+//      }
+//    }
+//
+//    val ack = publish.qos match {
+//      case AT_LEAST_ONCE => at_least_once_ack _
+//      case EXACTLY_ONCE => exactly_once_ack _
+//      case AT_MOST_ONCE => null
+//    }
+//
+//    if( !route.targets.isEmpty ) {
+//      val delivery = new Delivery
+//      delivery.message = RawMessage(publish.payload)
+//      delivery.persistent = publish.qos().ordinal() > 0
+//      delivery.size = publish.payload.length
+//      delivery.ack = ack
+//      if( publish.retain() ) {
+//        if( delivery.size == 0 ) {
+//          delivery.retain = RetainRemove
+//        } else {
+//          delivery.retain = RetainSet
+//        }
+//      }
+//
+//      // routes can always accept at least 1 delivery...
+//      assert( !route.full )
+//      route.offer(delivery)
+//      if( route.full ) {
+//        // but once it gets full.. suspend to flow control the producer.
+//        route.suspended = true
+//        handler.get.suspend_read("blocked sending to: \
"+route.overflowSessions.mkString(", ")) +//      }
+//
+//    } else {
+//      ack(null, null)
+//    }
+//  }
+//
+//
+//  //
+//  def publish_will(complete_close: =>Unit) = {
+//    if(connect_message!=null) {
+//      if( connect_message.willTopic()==null ) {
+//        complete_close
+//      } else {
+//
+//        val destination = decode_destination(connect_message.willTopic())
+//        val prodcuer = new DeliveryProducerRoute(host.router) {
+//          override def send_buffer_size = 1024*64
+//          override def connection = handler.map(_.connection)
+//          override def dispatch_queue = queue
+//          refiller = NOOP
+//        }
+//
+//        host.dispatch_queue {
+//          host.router.connect(Array(destination), prodcuer, security_context)
+//          queue {
+//            if(prodcuer.targets.isEmpty) {
+//              complete_close
+//            } else {
+//              val delivery = new Delivery
+//              delivery.message = RawMessage(connect_message.willMessage())
+//              delivery.size = connect_message.willMessage().length
+//              delivery.persistent = connect_message.willQos().ordinal() > 0
+//              if( connect_message.willRetain() ) {
+//                if( delivery.size == 0 ) {
+//                  delivery.retain = RetainRemove
+//                } else {
+//                  delivery.retain = RetainSet
+//                }
+//              }
+//
+//              delivery.ack = (x,y) => {
+//                host.dispatch_queue {
+//                  host.router.disconnect(Array(destination), prodcuer)
+//                }
+//                complete_close
+//              }
+//              handler.get.messages_received += 1
+//              prodcuer.offer(delivery)
+//            }
+//          }
+//        }
+//      }
+//    }
+//  }
+//  /////////////////////////////////////////////////////////////////////
+//  //
+//  // Bits that deal with subscriptions
+//  //
+//  /////////////////////////////////////////////////////////////////////
+//
+//  def on_mqtt_subscribe(sub:SUBSCRIBE):Unit = {
+//    subscribe(sub.topics()) {
+//      queue {
+//        session_state.strategy.update {
+//          val suback = new SUBACK
+//          suback.messageId(sub.messageId())
+//          suback.grantedQos(sub.topics().map(_.qos().ordinal().toByte))
+//          send(suback)
+//        }
+//      }
+//    }
+//  }
+//
+//  def subscribe(topics:Traversable[Topic])(on_subscribed: => Unit):Unit = {
+//    var addresses:Array[_ <: BindAddress] = topics.toArray.map { topic =>
+//      var address:BindAddress = decode_destination(topic.name)
+//      session_state.subscriptions += topic.name -> (topic, address)
+//      mqtt_consumer.addresses += address -> topic.qos
+//      if(PathParser.containsWildCards(address.path)) {
+//        mqtt_consumer.wildcards.put( address.path, topic.qos() )
+//      }
+//      address
+//    }
+//
+//    handler.get.subscription_count = mqtt_consumer.addresses.size
+//
+//    addresses = if( clean_session ) {
+//      addresses
+//    } else {
+//      session_state.durable_sub = SubscriptionAddress(Path(client_id.toString), \
null, mqtt_consumer.addresses.keySet.toArray) +//      \
Array(session_state.durable_sub) +//    }
+//
+//    host.dispatch_queue {
+//      addresses.foreach { address=>
+//        host.router.bind(Array[BindAddress](address), mqtt_consumer, \
security_context) { result => +//        // MQTT ignores subscribe failures.
+//        }
+//      }
+//      on_subscribed
+//    }
+//
+//  }
+//
+//  def on_mqtt_unsubscribe(unsubscribe:UNSUBSCRIBE):Unit = {
+//
+//    val addresses:Array[_ <: BindAddress] = unsubscribe.topics.flatMap { topic =>
+//      session_state.subscriptions.remove(topic).map { case (topic, address)=>
+//        mqtt_consumer.addresses.remove(address)
+//        if(PathParser.containsWildCards(address.path)) {
+//          mqtt_consumer.wildcards.remove(address.path, topic.qos)
+//        }
+//        address
+//      }
+//    }
+//
+//    handler.get.subscription_count = mqtt_consumer.addresses.size
+//
+//    if(!clean_session) {
+//      session_state.durable_sub = SubscriptionAddress(Path(client_id.toString), \
null, mqtt_consumer.addresses.keySet.toArray) +//    }
+//
+//    host.dispatch_queue {
+//      if(clean_session) {
+//        host.router.unbind(addresses, mqtt_consumer, false, security_context)
+//      } else {
+//        if( mqtt_consumer.addresses.isEmpty ) {
+//          host.router.unbind(Array(session_state.durable_sub), mqtt_consumer, \
true, security_context) +//          session_state.durable_sub = null
+//        } else {
+//          host.router.bind(Array(session_state.durable_sub), mqtt_consumer, \
security_context) { result => +//          }
+//        }
+//      }
+//      queue {
+//        session_state.strategy.update {
+//          val ack = new UNSUBACK
+//          ack.messageId(unsubscribe.messageId())
+//          send(ack)
+//        }
+//      }
+//    }
+//
+//  }
+//
+//  var publish_body = false
+//
+//  lazy val mqtt_consumer = new MqttConsumer
+//  class MqttConsumer extends BaseRetained with DeliveryConsumer {
+//
+//    override def toString = "mqtt client:"+client_id+" remote address: \
"+security_context.remote_address +//
+//    val addresses = HashMap[BindAddress, QoS]()
+//    val wildcards = new PathMap[QoS]()
+//
+//    val credit_window_source = createSource(new EventAggregator[(Int, Int), (Int, \
Int)] { +//      def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
+//        if( previous == null ) {
+//          event
+//        } else {
+//          (previous._1+event._1, previous._2+event._2)
+//        }
+//      }
+//      def mergeEvents(previous:(Int, Int), events:(Int, Int)) = \
mergeEvent(previous, events) +//    }, dispatch_queue)
+//
+//    credit_window_source.setEventHandler(^{
+//      val data = credit_window_source.getData
+//      credit_window_filter.credit(data._1, data._2)
+//    });
+//    credit_window_source.resume
+//
+//    val consumer_sink = new MutableSink[Request]()
+//    consumer_sink.downstream = None
+//
+//    var next_seq_id = 1L
+//    def get_next_seq_id = {
+//      val rc = next_seq_id
+//      next_seq_id += 1
+//      rc
+//    }
+//
+//    def to_message_id(value:Long):Short = (
+//        0x8000 | // MQTT message ids cannot be zero, so we always set the highest \
bit. +//        (value & 0x7FFF) // the lower 15 bits come for the original seq id.
+//      ).toShort
+//
+//    val credit_window_filter = new CreditWindowFilter[(Session[Delivery], \
Delivery)](consumer_sink.flatMap{ event => +//      queue.assertExecuting()
+//      val (session, delivery) = event
+//
+//      session_manager.delivered(session, delivery.size)
+//
+//      // Look up which QoS we need to send this message with..
+//      var topic = delivery.sender.head.simple
+//      import collection.JavaConversions._
+//      addresses.get(topic).orElse(wildcards.get(topic.path).headOption) match {
+//
+//        case None =>
+//          // draining messages after an un-subscribe
+//          acked(delivery, Consumed)
+//          None
+//
+//        case Some(qos) =>
+//
+//          // Convert the Delivery into a Request
+//          var publish = new PUBLISH
+//          publish.topicName(new \
UTF8Buffer(destination_parser.encode_destination(Array(delivery.sender.head)))) +//   \
if( delivery.redeliveries > 0) { +//            publish.dup(true)
+//          }
+//
+//          if( delivery.message.codec eq RawMessageCodec ) {
+//            publish.payload(delivery.message.asInstanceOf[RawMessage].payload)
+//          } else {
+//            if( publish_body ) {
+//              publish.payload(delivery.message.getBodyAs(classOf[Buffer]))
+//            } else {
+//              publish.payload(delivery.message.encoded)
+//            }
+//          }
+//
+//          handler.get.messages_sent += 1
+//
+//          if (delivery.ack!=null && (qos ne AT_MOST_ONCE)) {
+//            publish.qos(qos)
+//            val id = to_message_id(if(clean_session) {
+//              get_next_seq_id // generate our own seq id.
+//            } else {
+//              delivery.seq // use the durable sub's seq id..
+//            })
+//
+//            publish.messageId(id)
+//            val request = Request(id, publish, (result)=>{acked(delivery, \
result)}) +//            in_flight_publishes.put(id, request) match {
+//              case Some(r) =>
+//                // A reconnecting client could have acked before
+//                // we get dispatched by the durable sub.
+//                if( r.message == null ) {
+//                  in_flight_publishes.remove(id)
+//                  acked(delivery, Consumed)
+//                } else {
+//                  // Looks we sent out a msg with that id.  This could only
+//                  // happen once we send out 0x7FFF message and the first
+//                  // one has not been acked.
+//                  handler.foreach(_.async_die("Client not acking regularly.", \
null)) +//                }
+//              case None =>
+//            }
+//
+//            Some(request)
+//
+//          } else {
+//            // This callback gets executed once the message
+//            // sent to the transport.
+//            publish.qos(AT_MOST_ONCE)
+//            Some(Request(0, publish, (result)=>{ acked(delivery, result) }))
+//          }
+//      }
+//
+//    }, SessionDeliverySizer)
+//
+//    def acked(delivery:Delivery, result:DeliveryResult) = {
+//      queue.assertExecuting()
+//      credit_window_source.merge((delivery.size, 1))
+//      if( delivery.ack!=null ) {
+//        delivery.ack(result, null)
+//      }
+//    }
+//
+//    credit_window_filter.credit(handler.get.codec.getWriteBufferSize*2, 1)
+//
+//    val session_manager:SessionSinkMux[Delivery] = new \
SessionSinkMux[Delivery](credit_window_filter, queue, Delivery, Integer.MAX_VALUE/2, \
receive_buffer_size) { +//      override def time_stamp = host.broker.now
+//    }
+//
+//    override def dispose() = queue {
+//      super.dispose()
+//    }
+//
+//    def dispatch_queue = queue
+//    override def connection = handler.map(_.connection)
+//    override def receive_buffer_size = 1024*64; // \
handler.codec.getWriteBufferSize +//    def is_persistent = false
+//    def matches(delivery:Delivery):Boolean = true
+//
+//    //
+//    // Each destination we subscribe to will establish a session with us.
+//    //
+//    class MqttConsumerSession(val producer:DeliveryProducer) extends \
DeliverySession with SessionSinkFilter[Delivery] { +//      \
producer.dispatch_queue.assertExecuting() +//      retain
+//
+//      val downstream = session_manager.open(producer.dispatch_queue)
+//
+//      override def toString = "connection to \
"+handler.map(_.connection.transport.getRemoteAddress).getOrElse("unconnected") +//
+//      def consumer = mqtt_consumer
+//      var closed = false
+//
+//      def close = {
+//        assert(producer.dispatch_queue.isExecuting)
+//        if( !closed ) {
+//          closed = true
+//          dispose
+//        }
+//      }
+//
+//      def dispose = {
+//        session_manager.close(downstream, (delivery)=>{
+//          // We have been closed so we have to nak any deliveries.
+//          if( delivery.ack!=null ) {
+//            delivery.ack(Undelivered, delivery.uow)
+//          }
+//        })
+//        release
+//      }
+//
+//      // Delegate all the flow control stuff to the session
+//      override def full = {
+//        val rc = super.full
+//        rc
+//      }
+//
+//      def offer(delivery:Delivery) = {
+//        if( full ) {
+//          false
+//        } else {
+//          delivery.message.retain()
+//          val rc = downstream.offer(delivery)
+//          assert(rc, "offer should be accepted since it was not full")
+//          true
+//        }
+//      }
+//
+//    }
+//    def connect(p:DeliveryProducer) = new MqttConsumerSession(p)
+//  }
+//
+//}


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic