[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r1461839 [1/2] - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/ap
From: chirino () apache ! org
Date: 2013-03-27 20:20:32
Message-ID: 20130327202032.C4BBE23888CD () eris ! apache ! org
[Download RAW message or body]
Author: chirino
Date: Wed Mar 27 20:20:31 2013
New Revision: 1461839
URL: http://svn.apache.org/r1461839
Log:
Starting to port the MQTT module to be a pure java impl, just to see how much effort \
that would take.
Added:
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.java
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolCodecFactory.java
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.java
- copied, changed from r1459409, \
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala
Removed:
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-scala/pom.xml
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/mai \
n/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1461839&r1=1461838&r2=1461839&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala \
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala \
Wed Mar 27 20:20:31 2013 @@ -40,6 +40,12 @@ abstract class Connection() extends Base
private var _dispatch_queue = createQueue()
def dispatch_queue = _dispatch_queue
+ def _set_dispatch_queue(next_queue:DispatchQueue, on_complete:Task) {
+ set_dispatch_queue(next_queue) {
+ on_complete.run()
+ }
+ }
+
def set_dispatch_queue(next_queue:DispatchQueue)(on_complete: =>Unit) {
_dispatch_queue {
if(transport!=null) {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/mai \
n/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1461839&r1=1461838&r2=1461839&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala \
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala \
Wed Mar 27 20:20:31 2013 @@ -74,6 +74,8 @@ trait DeliveryConsumer extends Retained
def is_persistent:Boolean
}
+abstract class AbstractRetainedDeliveryConsumer extends BaseRetained with \
DeliveryConsumer +
class DeliveryConsumerFilter(val next:DeliveryConsumer) extends DeliveryConsumer {
override def browser: Boolean = next.browser
override def close_on_drain: Boolean = next.close_on_drain
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/mai \
n/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1461839&r1=1461838&r2=1461839&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala \
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala \
Wed Mar 27 20:20:31 2013 @@ -331,13 +331,22 @@ trait SessionSink[T] extends Sink[T] {
}
trait SessionSinkFilter[T] extends SessionSink[T] with SinkFilter[T] {
- def downstream:SessionSink[T]
+ def downstream: SessionSink[T]
def enqueue_item_counter = downstream.enqueue_item_counter
def enqueue_size_counter = downstream.enqueue_size_counter
def enqueue_ts = downstream.enqueue_ts
def remaining_capacity = downstream.remaining_capacity
}
+abstract class AbstractSessionSinkFilter[T] extends SessionSink[T] with \
SinkFilter[T] { + def downstream:Sink[T] = downstream_session_sink
+ def downstream_session_sink: SessionSink[T]
+ def enqueue_item_counter = downstream_session_sink.enqueue_item_counter
+ def enqueue_size_counter = downstream_session_sink.enqueue_size_counter
+ def enqueue_ts = downstream_session_sink.enqueue_ts
+ def remaining_capacity = downstream_session_sink.remaining_capacity
+}
+
/**
* <p>
* A SinkMux multiplexes access to a target sink so that multiple
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.java?rev=1461839&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.java \
(added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.java \
Wed Mar 27 20:20:31 2013 @@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt;
+
+import org.apache.activemq.apollo.broker.DestinationParser;
+import org.apache.activemq.apollo.broker.protocol.Protocol;
+import org.apache.activemq.apollo.broker.protocol.ProtocolHandler;
+import org.fusesource.hawtbuf.AsciiBuffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MqttProtocol extends MqttProtocolCodecFactory implements Protocol {
+
+ static final DestinationParser destination_parser = new DestinationParser();
+ static final AsciiBuffer PROTOCOL_ID = new AsciiBuffer(id);
+ static {
+ destination_parser.queue_prefix_$eq(null);
+ destination_parser.topic_prefix_$eq(null);
+ destination_parser.path_separator_$eq("/");
+ destination_parser.any_child_wildcard_$eq("+");
+ destination_parser.any_descendant_wildcard_$eq("#");
+ destination_parser.dsub_prefix_$eq(null);
+ destination_parser.temp_queue_prefix_$eq(null);
+ destination_parser.temp_topic_prefix_$eq(null);
+ destination_parser.destination_separator_$eq(null);
+ destination_parser.regex_wildcard_end_$eq(null);
+ destination_parser.regex_wildcard_end_$eq(null);
+ destination_parser.part_pattern_$eq(null);
+ }
+
+
+ @Override
+ public ProtocolHandler createProtocolHandler() {
+ return new MqttProtocolHandler();
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolCodecFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/ \
scala/org/apache/activemq/apollo/mqtt/MqttProtocolCodecFactory.java?rev=1461839&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolCodecFactory.java \
(added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolCodecFactory.java \
Wed Mar 27 20:20:31 2013 @@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt;
+
+import org.apache.activemq.apollo.broker.Broker$;
+import org.apache.activemq.apollo.broker.Connector;
+import org.apache.activemq.apollo.broker.protocol.ProtocolCodecFactory;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.transport.ProtocolCodec;
+import org.fusesource.mqtt.codec.MQTTProtocolCodec;
+
+/**
+ * Creates MqttCodec objects that encode/decode the
+ * <a href="http://activemq.apache.org/mqtt/">Mqtt</a> protocol.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MqttProtocolCodecFactory implements ProtocolCodecFactory.Provider {
+
+ //
+ // An MQTT CONNECT message has between 10-13 bytes:
+ // Message Type : 0x10 @ [0]
+ // Remaining Length : Byte{1-4} @ [1]
+ // Protocol Name : 0x00 0x06 'M' 'Q' 'I' 's' 'd' 'p' @ [2|3|4|5]
+ //
+ static final String id = "mqtt";
+ static final Buffer HEAD_MAGIC = new Buffer(new byte []{ 0x10 });
+ static final Buffer TAIL_MAGIC = new Buffer(new byte []{ 0x00, 0x06, 'M', 'Q', \
'I', 's', 'd', 'p'}); +
+ @Override
+ public String id() {
+ return id;
+ }
+
+ @Override
+ public ProtocolCodec createProtocolCodec(Connector connector) {
+ MQTTProtocolCodec rc = new MQTTProtocolCodec();
+ rc.setBufferPools(Broker$.MODULE$.buffer_pools());
+ return rc;
+ }
+
+ @Override
+ public boolean isIdentifiable() {
+ return true;
+ }
+
+ @Override
+ public int maxIdentificaionLength() {
+ return 13;
+ }
+
+ @Override
+ public boolean matchesIdentification(Buffer header) {
+ if (header.length < 10) {
+ return false;
+ } else {
+ return header.startsWith(HEAD_MAGIC) && header.indexOf(TAIL_MAGIC, 2) < 6;
+ }
+ }
+
+}
Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/ \
scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1461839&r1=1461838&r2=1461839&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala \
(original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala \
Wed Mar 27 20:20:31 2013 @@ -46,12 +46,17 @@ import scala.Some
import org.apache.activemq.apollo.mqtt.MqttSessionManager.HostState
import org.apache.activemq.apollo.broker.SubscriptionAddress
+case class Request(id:Short, message:MessageSupport.Message, \
ack:UnitFn1[DeliveryResult]) { + val frame = if(message==null) null else \
message.encode() + var delivered = false
+}
+
+object SessionDeliverySizer extends Sizer[(Session[Delivery], Delivery)] {
+ def size(value: (Session[Delivery], Delivery)) = Delivery.size(value._2)
+}
+
object MqttProtocolHandler extends Log {
- case class Request(id:Short, message:MessageSupport.Message, \
ack:(DeliveryResult)=>Unit) {
- val frame = if(message==null) null else message.encode()
- var delivered = false
- }
def received[T](value:T):T = {
trace("received: %s", value)
@@ -60,9 +65,6 @@ object MqttProtocolHandler extends Log {
val WAITING_ON_CLIENT_REQUEST = ()=> "client request"
- object SessionDeliverySizer extends Sizer[(Session[Delivery], Delivery)] {
- def size(value: (Session[Delivery], Delivery)) = Delivery.size(value._2)
- }
}
/**
@@ -202,6 +204,7 @@ class MqttProtocolHandler extends Protoc
/////////////////////////////////////////////////////////////////////
var status = WAITING_ON_CLIENT_REQUEST
+ def _suspend_read(reason: String) = suspend_read(reason)
def suspend_read(reason: => String) = {
status = reason _
connection.transport.suspendRead
@@ -414,15 +417,15 @@ class MqttProtocolHandler extends Protoc
// Other msic bits.
//
/////////////////////////////////////////////////////////////////////
- var messages_sent = 0L
- var messages_received = 0L
+ val messages_sent = new LongCounter()
+ var messages_received = new LongCounter()
var subscription_count = 0
override def create_connection_status = {
var rc = new MqttConnectionStatusDTO
rc.protocol_version = "3.1"
- rc.messages_sent = messages_sent
- rc.messages_received = messages_received
+ rc.messages_sent = messages_sent.get()
+ rc.messages_received = messages_received.get
rc.subscription_count = subscription_count
rc.waiting_on = status()
rc
@@ -446,17 +449,17 @@ object MqttSessionManager {
class SessionState {
var durable_sub:SubscriptionAddress = _
- val subscriptions = HashMap[UTF8Buffer, (Topic, BindAddress)]()
+ val subscriptions = new java.util.HashMap[UTF8Buffer, (Topic, BindAddress)]()
val received_message_ids: HashSet[Short] = new HashSet[Short]
trait StorageStrategy {
- def update(cb: =>Unit)
- def destroy(cb: =>Unit)
+ def update(cb: Task)
+ def destroy(cb: Task)
def create(store:Store, client_id:UTF8Buffer)
}
case class NoopStrategy() extends StorageStrategy {
- def update(cb: =>Unit) = { cb }
- def destroy(cb: =>Unit) { cb }
+ def update(cb: Task) = { cb.run() }
+ def destroy(cb: Task) { cb.run() }
def create(store:Store, client_id:UTF8Buffer) = {
if(store!=null)
strategy = StoreStrategy(store, client_id)
@@ -465,12 +468,14 @@ object MqttSessionManager {
case class StoreStrategy(store:Store, client_id:UTF8Buffer) extends \
StorageStrategy { val session_key = new UTF8Buffer("mqtt:"+client_id)
- def update(cb: =>Unit) = {
+ def update(cb: Task) = {
val uow = store.create_uow
val session_pb = new SessionPB.Bean
session_pb.setClientId(client_id)
received_message_ids.foreach(session_pb.addReceivedMessageIds(_))
- subscriptions.values.foreach { case (topic, address) =>
+
+ import collection.JavaConversions._
+ for( (topic, address) <- subscriptions.values) {
val topic_pb = new TopicPB.Bean
topic_pb.setName(topic.name())
topic_pb.setQos(topic.qos().ordinal())
@@ -482,20 +487,20 @@ object MqttSessionManager {
val current = getCurrentQueue
uow.on_complete {
current {
- cb
+ cb.run()
}
}
uow.release
}
- def destroy(cb: =>Unit) {
+ def destroy(cb: Task) {
val uow = store.create_uow
uow.put(session_key, null)
val current = getCurrentQueue
uow.on_complete {
current {
strategy = NoopStrategy()
- cb
+ cb.run()
}
}
uow.release
@@ -564,7 +569,7 @@ object MqttSessionManager {
} else {
host_state.session_states.getOrElseUpdate(client_id, new SessionState())
}
- val assignment = MqttSession(host_state, client_id, state)
+ val assignment = new MqttSession(host_state, client_id, state)
assignment.connect(handler)
host_state.sessions.put(client_id, assignment)
}
@@ -583,724 +588,724 @@ object MqttSessionManager {
}
}
-/**
- * An MqttSession can be switch from one connection/protocol handler to another,
- * but it will only be associated with one at a time. An MqttSession tracks
- * the state of the communication with a client.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-case class MqttSession(host_state:HostState, client_id:UTF8Buffer, \
session_state:SessionState) {
-
- import MqttProtocolHandler._
-
- def host = host_state.host
-
- val queue = createQueue("mqtt: "+client_id)
- var manager_disconnected = false
-
- var handler:Option[MqttProtocolHandler] = None
- var security_context:SecurityContext = _
- var clean_session = false
- var connect_message:CONNECT = _
- var destination_parser = MqttProtocol.destination_parser
-
- def connect(next:MqttProtocolHandler):Unit = queue {
- if(manager_disconnected) {
- // we are not the assignment anymore.. go to the session manager
- // again to setup a new session.
- MqttSessionManager.attach(host, client_id, next)
- } else {
-
- // so that we don't switch again until this current switch completes
- queue.suspend()
- if( handler != None ) {
- detach
- handler = None
- }
- queue {
- handler=Some(next)
- attach
- }
-
- // switch the connection to the session queue..
- next.connection.set_dispatch_queue(queue) {
- queue.resume()
- }
- }
- }
-
- def disconnect(prev:MqttProtocolHandler) = queue {
- if( handler==Some(prev) ) {
- MqttSessionManager.remove(host_state, client_id)
- manager_disconnected = true
- detach
- handler = None
- }
- }
- /////////////////////////////////////////////////////////////////////
- //
- // Bits that deal with connections attaching/detaching from the session
- //
- /////////////////////////////////////////////////////////////////////
- def attach = {
- queue.assertExecuting()
- val h = handler.get
- clean_session = h.connect_message.cleanSession()
- security_context = h.security_context
- h.command_handler = on_transport_command _
- destination_parser = h.destination_parser
- mqtt_consumer.consumer_sink.downstream = Some(h.sink_manager.open)
-
- def ack_connect = {
- queue.assertExecuting()
- connect_message = h.connect_message
- val connack = new CONNACK
- connack.code(CONNECTION_ACCEPTED)
- send(connack)
- }
-
- if( !clean_session ) {
- // Setup the previous subscriptions..
- session_state.strategy.create(host.store, client_id)
- if( !session_state.subscriptions.isEmpty ) {
- h.suspend_read("subscribing")
- subscribe(session_state.subscriptions.map(_._2._1)) {
- h.resume_read()
- h.queue {
- ack_connect
- }
- }
- } else {
- ack_connect
- }
- } else {
- // do we need to clear the received ids?
- // durable_session_state.received_message_ids.clear()
- session_state.subscriptions.clear()
- if( session_state.durable_sub !=null ) {
- var addresses = Array(session_state.durable_sub)
- session_state.durable_sub = null
- host.dispatch_queue {
- host.router.delete(addresses, security_context)
- }
- }
- session_state.strategy.destroy {
- ack_connect
- }
- }
-
- }
-
- def detach:Unit = {
- queue.assertExecuting()
-
- if(!producerRoutes.isEmpty) {
- import collection.JavaConversions._
- val routes = producerRoutes.values.toSeq.toArray
- host.dispatch_queue {
- routes.foreach { route=>
- host.router.disconnect(Array(route.address), route)
- }
- }
- producerRoutes.clear
- }
-
- if( clean_session ) {
- if(!mqtt_consumer.addresses.isEmpty) {
- var addresses = mqtt_consumer.addresses.keySet.toArray
- host.dispatch_queue {
- host.router.unbind(addresses, mqtt_consumer, false , security_context)
- }
- mqtt_consumer.addresses.clear()
- }
- session_state.subscriptions.clear()
- } else {
- if(session_state.durable_sub!=null) {
- var addresses = Array(session_state.durable_sub)
- host.dispatch_queue {
- host.router.unbind(addresses, mqtt_consumer, false , security_context)
- }
- mqtt_consumer.addresses.clear()
- session_state.durable_sub = null
- }
- }
-
- in_flight_publishes.values.foreach { request =>
- if( request.ack!=null ) {
- if(request.delivered) {
- request.ack(Delivered)
- } else {
- request.ack(Undelivered)
- }
- }
- }
- in_flight_publishes.clear()
-
- handler.get.sink_manager.close(mqtt_consumer.consumer_sink.downstream.get, \
(request)=>{})
- mqtt_consumer.consumer_sink.downstream = None
-
- handler.get.on_transport_disconnected()
- }
-
- def decode_destination(value:UTF8Buffer):SimpleAddress = {
- val rc = destination_parser.decode_single_destination(value.toString, (name)=>{
- SimpleAddress("topic", destination_parser.decode_path(name))
- })
- if( rc==null ) {
- handler.foreach(_.die("Invalid mqtt destination name: "+value))
- }
- rc
- }
-
- /////////////////////////////////////////////////////////////////////
- //
- // Bits that deal with assigning message ids to QoS > 0 requests
- // and tracking those requests so that they can get replayed on a
- // reconnect.
- //
- /////////////////////////////////////////////////////////////////////
-
- var in_flight_publishes = HashMap[Short, Request]()
-
- def send(message: MessageSupport.Message): Unit = {
- queue.assertExecuting()
- handler.foreach(_.connection_sink.offer(Request(0, message, null)))
- }
-
- def publish_completed(id: Short): Unit = {
- queue.assertExecuting()
- in_flight_publishes.remove(id) match {
- case Some(request) =>
- if ( request.ack != null ) {
- request.ack(Consumed)
- }
- case None =>
- // It's possible that on a reconnect, we get an ACK
- // in for message that was not dispatched yet. store
- // a place holder so we ack it upon the dispatch
- // attempt.
- in_flight_publishes.put(id, Request(id, null, null))
- }
- }
-
- /////////////////////////////////////////////////////////////////////
- //
- // Bits that deal with processing new messages from the client.
- //
- /////////////////////////////////////////////////////////////////////
- def on_transport_command(command:AnyRef):Unit = command match {
- case command:MQTTFrame=>
-
- command.messageType() match {
-
- case PUBLISH.TYPE =>
- on_mqtt_publish(received(new PUBLISH().decode(command)))
-
- // This follows a Publish with QoS EXACTLY_ONCE
- case PUBREL.TYPE =>
- var ack = received(new PUBREL().decode(command))
- // TODO: perhaps persist the processed list.. otherwise
- // we can't filter out dups after a broker restart.
- session_state.received_message_ids.remove(ack.messageId)
- session_state.strategy.update {
- send(new PUBCOMP().messageId(ack.messageId))
- }
-
- case SUBSCRIBE.TYPE =>
- on_mqtt_subscribe(received(new SUBSCRIBE().decode(command)))
-
- case UNSUBSCRIBE.TYPE =>
- on_mqtt_unsubscribe(received(new UNSUBSCRIBE().decode(command)))
-
- // AT_LEAST_ONCE ack flow for a client subscription
- case PUBACK.TYPE =>
- val ack = received(new PUBACK().decode(command))
- publish_completed(ack.messageId)
-
- // EXACTLY_ONCE ack flow for a client subscription
- case PUBREC.TYPE =>
- val ack = received(new PUBREC().decode(command))
- send(new PUBREL().messageId(ack.messageId))
-
- case PUBCOMP.TYPE =>
- val ack: PUBCOMP = received(new PUBCOMP().decode(command))
- publish_completed(ack.messageId)
-
- case PINGREQ.TYPE =>
- received(new PINGREQ().decode(command))
- send(new PINGRESP())
-
- case DISCONNECT.TYPE =>
- received(new DISCONNECT())
- MqttSessionManager.disconnect(host_state, client_id, handler.get)
-
- case _ =>
- handler.get.die("Invalid MQTT message type: "+command.messageType());
- }
- case "failure" =>
- // Publish the client's will
- publish_will {
- // then disconnect him.
- MqttSessionManager.disconnect(host_state, client_id, handler.get)
- }
-
- case _=>
- handler.get.die("Internal Server Error: unexpected mqtt command: \
"+command.getClass);
- }
-
- /////////////////////////////////////////////////////////////////////
- //
- // Bits that deal with processing PUBLISH messages
- //
- /////////////////////////////////////////////////////////////////////
- var producerRoutes = new LRUCache[UTF8Buffer, MqttProducerRoute](10) {
- override def onCacheEviction(eldest: Entry[UTF8Buffer, MqttProducerRoute]) = {
- host.dispatch_queue {
- host.router.disconnect(Array(eldest.getValue.address), eldest.getValue)
- }
- }
- }
- case class MqttProducerRoute(address:SimpleAddress, handler:MqttProtocolHandler) \
extends DeliveryProducerRoute(host.router) {
- override def send_buffer_size = handler.codec.getReadBufferSize
- override def connection = Some(handler.connection)
- override def dispatch_queue = queue
-
- var suspended = false
-
- refiller = ^ {
- if( suspended ) {
- suspended = false
- handler.resume_read
- }
- }
-
- }
-
- def on_mqtt_publish(publish:PUBLISH):Unit = {
-
- if( (publish.qos eq EXACTLY_ONCE) && \
session_state.received_message_ids.contains(publish.messageId)) {
- val response = new PUBREC
- response.messageId(publish.messageId)
- send(response)
- return
- }
-
- handler.get.messages_received += 1
-
- queue.assertExecuting()
- producerRoutes.get(publish.topicName()) match {
- case null =>
- // create the producer route...
-
- val destination = decode_destination(publish.topicName())
- val route = MqttProducerRoute(destination, handler.get)
-
- // don't process commands until producer is connected...
- route.handler.suspend_read("route publish lookup")
- host.dispatch_queue {
- host.router.connect(Array(destination), route, security_context)
- queue {
- // We don't care if we are not allowed to send..
- if (!route.handler.connection.stopped) {
- route.handler.resume_read
- producerRoutes.put(publish.topicName(), route)
- send_via_route(route, publish)
- }
- }
- }
-
- case route =>
- // we can re-use the existing producer route
- send_via_route(route, publish)
- }
- }
-
- def send_via_route(route:MqttProducerRoute, publish:PUBLISH):Unit = {
- queue.assertExecuting()
-
- def at_least_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
- val response = new PUBACK
- response.messageId(publish.messageId)
- send(response)
- }
-
- def exactly_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
- queue.assertExecuting()
- // TODO: perhaps persist the processed list..
- session_state.received_message_ids.add(publish.messageId)
- session_state.strategy.update {
- val response = new PUBREC
- response.messageId(publish.messageId)
- send(response)
- }
- }
-
- val ack = publish.qos match {
- case AT_LEAST_ONCE => at_least_once_ack _
- case EXACTLY_ONCE => exactly_once_ack _
- case AT_MOST_ONCE => null
- }
-
- if( !route.targets.isEmpty ) {
- val delivery = new Delivery
- delivery.message = RawMessage(publish.payload)
- delivery.persistent = publish.qos().ordinal() > 0
- delivery.size = publish.payload.length
- delivery.ack = ack
- if( publish.retain() ) {
- if( delivery.size == 0 ) {
- delivery.retain = RetainRemove
- } else {
- delivery.retain = RetainSet
- }
- }
-
- // routes can always accept at least 1 delivery...
- assert( !route.full )
- route.offer(delivery)
- if( route.full ) {
- // but once it gets full.. suspend to flow control the producer.
- route.suspended = true
- handler.get.suspend_read("blocked sending to: \
"+route.overflowSessions.mkString(", "))
- }
-
- } else {
- ack(null, null)
- }
- }
-
-
- //
- def publish_will(complete_close: =>Unit) = {
- if(connect_message!=null) {
- if( connect_message.willTopic()==null ) {
- complete_close
- } else {
-
- val destination = decode_destination(connect_message.willTopic())
- val prodcuer = new DeliveryProducerRoute(host.router) {
- override def send_buffer_size = 1024*64
- override def connection = handler.map(_.connection)
- override def dispatch_queue = queue
- refiller = NOOP
- }
-
- host.dispatch_queue {
- host.router.connect(Array(destination), prodcuer, security_context)
- queue {
- if(prodcuer.targets.isEmpty) {
- complete_close
- } else {
- val delivery = new Delivery
- delivery.message = RawMessage(connect_message.willMessage())
- delivery.size = connect_message.willMessage().length
- delivery.persistent = connect_message.willQos().ordinal() > 0
- if( connect_message.willRetain() ) {
- if( delivery.size == 0 ) {
- delivery.retain = RetainRemove
- } else {
- delivery.retain = RetainSet
- }
- }
-
- delivery.ack = (x,y) => {
- host.dispatch_queue {
- host.router.disconnect(Array(destination), prodcuer)
- }
- complete_close
- }
- handler.get.messages_received += 1
- prodcuer.offer(delivery)
- }
- }
- }
- }
- }
- }
- /////////////////////////////////////////////////////////////////////
- //
- // Bits that deal with subscriptions
- //
- /////////////////////////////////////////////////////////////////////
-
- def on_mqtt_subscribe(sub:SUBSCRIBE):Unit = {
- subscribe(sub.topics()) {
- queue {
- session_state.strategy.update {
- val suback = new SUBACK
- suback.messageId(sub.messageId())
- suback.grantedQos(sub.topics().map(_.qos().ordinal().toByte))
- send(suback)
- }
- }
- }
- }
-
- def subscribe(topics:Traversable[Topic])(on_subscribed: => Unit):Unit = {
- var addresses:Array[_ <: BindAddress] = topics.toArray.map { topic =>
- var address:BindAddress = decode_destination(topic.name)
- session_state.subscriptions += topic.name -> (topic, address)
- mqtt_consumer.addresses += address -> topic.qos
- if(PathParser.containsWildCards(address.path)) {
- mqtt_consumer.wildcards.put( address.path, topic.qos() )
- }
- address
- }
-
- handler.get.subscription_count = mqtt_consumer.addresses.size
-
- addresses = if( clean_session ) {
- addresses
- } else {
- session_state.durable_sub = SubscriptionAddress(Path(client_id.toString), \
null, mqtt_consumer.addresses.keySet.toArray)
- Array(session_state.durable_sub)
- }
-
- host.dispatch_queue {
- addresses.foreach { address=>
- host.router.bind(Array[BindAddress](address), mqtt_consumer, \
security_context) { result =>
- // MQTT ignores subscribe failures.
- }
- }
- on_subscribed
- }
-
- }
-
- def on_mqtt_unsubscribe(unsubscribe:UNSUBSCRIBE):Unit = {
-
- val addresses:Array[_ <: BindAddress] = unsubscribe.topics.flatMap { topic =>
- session_state.subscriptions.remove(topic).map { case (topic, address)=>
- mqtt_consumer.addresses.remove(address)
- if(PathParser.containsWildCards(address.path)) {
- mqtt_consumer.wildcards.remove(address.path, topic.qos)
- }
- address
- }
- }
-
- handler.get.subscription_count = mqtt_consumer.addresses.size
-
- if(!clean_session) {
- session_state.durable_sub = SubscriptionAddress(Path(client_id.toString), \
null, mqtt_consumer.addresses.keySet.toArray)
- }
-
- host.dispatch_queue {
- if(clean_session) {
- host.router.unbind(addresses, mqtt_consumer, false, security_context)
- } else {
- if( mqtt_consumer.addresses.isEmpty ) {
- host.router.unbind(Array(session_state.durable_sub), mqtt_consumer, true, \
security_context)
- session_state.durable_sub = null
- } else {
- host.router.bind(Array(session_state.durable_sub), mqtt_consumer, \
security_context) { result =>
- }
- }
- }
- queue {
- session_state.strategy.update {
- val ack = new UNSUBACK
- ack.messageId(unsubscribe.messageId())
- send(ack)
- }
- }
- }
-
- }
-
- var publish_body = false
-
- lazy val mqtt_consumer = new MqttConsumer
- class MqttConsumer extends BaseRetained with DeliveryConsumer {
-
- override def toString = "mqtt client:"+client_id+" remote address: \
"+security_context.remote_address
-
- val addresses = HashMap[BindAddress, QoS]()
- val wildcards = new PathMap[QoS]()
-
- val credit_window_source = createSource(new EventAggregator[(Int, Int), (Int, \
Int)] {
- def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
- if( previous == null ) {
- event
- } else {
- (previous._1+event._1, previous._2+event._2)
- }
- }
- def mergeEvents(previous:(Int, Int), events:(Int, Int)) = mergeEvent(previous, \
events)
- }, dispatch_queue)
-
- credit_window_source.setEventHandler(^{
- val data = credit_window_source.getData
- credit_window_filter.credit(data._1, data._2)
- });
- credit_window_source.resume
-
- val consumer_sink = new MutableSink[Request]()
- consumer_sink.downstream = None
-
- var next_seq_id = 1L
- def get_next_seq_id = {
- val rc = next_seq_id
- next_seq_id += 1
- rc
- }
-
- def to_message_id(value:Long):Short = (
- 0x8000 | // MQTT message ids cannot be zero, so we always set the highest \
bit.
- (value & 0x7FFF) // the lower 15 bits come for the original seq id.
- ).toShort
-
- val credit_window_filter = new CreditWindowFilter[(Session[Delivery], \
Delivery)](consumer_sink.flatMap{ event =>
- queue.assertExecuting()
- val (session, delivery) = event
-
- session_manager.delivered(session, delivery.size)
-
- // Look up which QoS we need to send this message with..
- var topic = delivery.sender.head.simple
- import collection.JavaConversions._
- addresses.get(topic).orElse(wildcards.get(topic.path).headOption) match {
-
- case None =>
- // draining messages after an un-subscribe
- acked(delivery, Consumed)
- None
-
- case Some(qos) =>
-
- // Convert the Delivery into a Request
- var publish = new PUBLISH
- publish.topicName(new \
UTF8Buffer(destination_parser.encode_destination(Array(delivery.sender.head))))
- if( delivery.redeliveries > 0) {
- publish.dup(true)
- }
-
- if( delivery.message.codec eq RawMessageCodec ) {
- publish.payload(delivery.message.asInstanceOf[RawMessage].payload)
- } else {
- if( publish_body ) {
- publish.payload(delivery.message.getBodyAs(classOf[Buffer]))
- } else {
- publish.payload(delivery.message.encoded)
- }
- }
-
- handler.get.messages_sent += 1
-
- if (delivery.ack!=null && (qos ne AT_MOST_ONCE)) {
- publish.qos(qos)
- val id = to_message_id(if(clean_session) {
- get_next_seq_id // generate our own seq id.
- } else {
- delivery.seq // use the durable sub's seq id..
- })
-
- publish.messageId(id)
- val request = Request(id, publish, (result)=>{acked(delivery, result)})
- in_flight_publishes.put(id, request) match {
- case Some(r) =>
- // A reconnecting client could have acked before
- // we get dispatched by the durable sub.
- if( r.message == null ) {
- in_flight_publishes.remove(id)
- acked(delivery, Consumed)
- } else {
- // Looks we sent out a msg with that id. This could only
- // happen once we send out 0x7FFF message and the first
- // one has not been acked.
- handler.foreach(_.async_die("Client not acking regularly.", null))
- }
- case None =>
- }
-
- Some(request)
-
- } else {
- // This callback gets executed once the message
- // sent to the transport.
- publish.qos(AT_MOST_ONCE)
- Some(Request(0, publish, (result)=>{ acked(delivery, result) }))
- }
- }
-
- }, SessionDeliverySizer)
-
- def acked(delivery:Delivery, result:DeliveryResult) = {
- queue.assertExecuting()
- credit_window_source.merge((delivery.size, 1))
- if( delivery.ack!=null ) {
- delivery.ack(result, null)
- }
- }
-
- credit_window_filter.credit(handler.get.codec.getWriteBufferSize*2, 1)
-
- val session_manager:SessionSinkMux[Delivery] = new \
SessionSinkMux[Delivery](credit_window_filter, queue, Delivery, Integer.MAX_VALUE/2, \
receive_buffer_size) {
- override def time_stamp = host.broker.now
- }
-
- override def dispose() = queue {
- super.dispose()
- }
-
- def dispatch_queue = queue
- override def connection = handler.map(_.connection)
- override def receive_buffer_size = 1024*64; // handler.codec.getWriteBufferSize
- def is_persistent = false
- def matches(delivery:Delivery):Boolean = true
-
- //
- // Each destination we subscribe to will establish a session with us.
- //
- class MqttConsumerSession(val producer:DeliveryProducer) extends DeliverySession \
with SessionSinkFilter[Delivery] {
- producer.dispatch_queue.assertExecuting()
- retain
-
- val downstream = session_manager.open(producer.dispatch_queue)
-
- override def toString = "connection to \
"+handler.map(_.connection.transport.getRemoteAddress).getOrElse("unconnected")
-
- def consumer = mqtt_consumer
- var closed = false
-
- def close = {
- assert(producer.dispatch_queue.isExecuting)
- if( !closed ) {
- closed = true
- dispose
- }
- }
-
- def dispose = {
- session_manager.close(downstream, (delivery)=>{
- // We have been closed so we have to nak any deliveries.
- if( delivery.ack!=null ) {
- delivery.ack(Undelivered, delivery.uow)
- }
- })
- release
- }
-
- // Delegate all the flow control stuff to the session
- override def full = {
- val rc = super.full
- rc
- }
-
- def offer(delivery:Delivery) = {
- if( full ) {
- false
- } else {
- delivery.message.retain()
- val rc = downstream.offer(delivery)
- assert(rc, "offer should be accepted since it was not full")
- true
- }
- }
-
- }
- def connect(p:DeliveryProducer) = new MqttConsumerSession(p)
- }
-
-}
+///**
+// * An MqttSession can be switch from one connection/protocol handler to another,
+// * but it will only be associated with one at a time. An MqttSession tracks
+// * the state of the communication with a client.
+// *
+// * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+// */
+//case class MqttSession(host_state:HostState, client_id:UTF8Buffer, \
session_state:SessionState) { +//
+// import MqttProtocolHandler._
+//
+// def host = host_state.host
+//
+// val queue = createQueue("mqtt: "+client_id)
+// var manager_disconnected = false
+//
+// var handler:Option[MqttProtocolHandler] = None
+// var security_context:SecurityContext = _
+// var clean_session = false
+// var connect_message:CONNECT = _
+// var destination_parser = MqttProtocol.destination_parser
+//
+// def connect(next:MqttProtocolHandler):Unit = queue {
+// if(manager_disconnected) {
+// // we are not the assignment anymore.. go to the session manager
+// // again to setup a new session.
+// MqttSessionManager.attach(host, client_id, next)
+// } else {
+//
+// // so that we don't switch again until this current switch completes
+// queue.suspend()
+// if( handler != None ) {
+// detach
+// handler = None
+// }
+// queue {
+// handler=Some(next)
+// attach
+// }
+//
+// // switch the connection to the session queue..
+// next.connection.set_dispatch_queue(queue) {
+// queue.resume()
+// }
+// }
+// }
+//
+// def disconnect(prev:MqttProtocolHandler) = queue {
+// if( handler==Some(prev) ) {
+// MqttSessionManager.remove(host_state, client_id)
+// manager_disconnected = true
+// detach
+// handler = None
+// }
+// }
+// /////////////////////////////////////////////////////////////////////
+// //
+// // Bits that deal with connections attaching/detaching from the session
+// //
+// /////////////////////////////////////////////////////////////////////
+// def attach = {
+// queue.assertExecuting()
+// val h = handler.get
+// clean_session = h.connect_message.cleanSession()
+// security_context = h.security_context
+// h.command_handler = on_transport_command _
+// destination_parser = h.destination_parser
+// mqtt_consumer.consumer_sink.downstream = Some(h.sink_manager.open)
+//
+// def ack_connect = {
+// queue.assertExecuting()
+// connect_message = h.connect_message
+// val connack = new CONNACK
+// connack.code(CONNECTION_ACCEPTED)
+// send(connack)
+// }
+//
+// if( !clean_session ) {
+// // Setup the previous subscriptions..
+// session_state.strategy.create(host.store, client_id)
+// if( !session_state.subscriptions.isEmpty ) {
+// h.suspend_read("subscribing")
+// subscribe(session_state.subscriptions.map(_._2._1)) {
+// h.resume_read()
+// h.queue {
+// ack_connect
+// }
+// }
+// } else {
+// ack_connect
+// }
+// } else {
+// // do we need to clear the received ids?
+// // durable_session_state.received_message_ids.clear()
+// session_state.subscriptions.clear()
+// if( session_state.durable_sub !=null ) {
+// var addresses = Array(session_state.durable_sub)
+// session_state.durable_sub = null
+// host.dispatch_queue {
+// host.router.delete(addresses, security_context)
+// }
+// }
+// session_state.strategy.destroy {
+// ack_connect
+// }
+// }
+//
+// }
+//
+// def detach:Unit = {
+// queue.assertExecuting()
+//
+// if(!producerRoutes.isEmpty) {
+// import collection.JavaConversions._
+// val routes = producerRoutes.values.toSeq.toArray
+// host.dispatch_queue {
+// routes.foreach { route=>
+// host.router.disconnect(Array(route.address), route)
+// }
+// }
+// producerRoutes.clear
+// }
+//
+// if( clean_session ) {
+// if(!mqtt_consumer.addresses.isEmpty) {
+// var addresses = mqtt_consumer.addresses.keySet.toArray
+// host.dispatch_queue {
+// host.router.unbind(addresses, mqtt_consumer, false , security_context)
+// }
+// mqtt_consumer.addresses.clear()
+// }
+// session_state.subscriptions.clear()
+// } else {
+// if(session_state.durable_sub!=null) {
+// var addresses = Array(session_state.durable_sub)
+// host.dispatch_queue {
+// host.router.unbind(addresses, mqtt_consumer, false , security_context)
+// }
+// mqtt_consumer.addresses.clear()
+// session_state.durable_sub = null
+// }
+// }
+//
+// in_flight_publishes.values.foreach { request =>
+// if( request.ack!=null ) {
+// if(request.delivered) {
+// request.ack(Delivered)
+// } else {
+// request.ack(Undelivered)
+// }
+// }
+// }
+// in_flight_publishes.clear()
+//
+// handler.get.sink_manager.close(mqtt_consumer.consumer_sink.downstream.get, \
(request)=>{}) +// mqtt_consumer.consumer_sink.downstream = None
+//
+// handler.get.on_transport_disconnected()
+// }
+//
+// def decode_destination(value:UTF8Buffer):SimpleAddress = {
+// val rc = destination_parser.decode_single_destination(value.toString, \
(name)=>{ +// SimpleAddress("topic", destination_parser.decode_path(name))
+// })
+// if( rc==null ) {
+// handler.foreach(_.die("Invalid mqtt destination name: "+value))
+// }
+// rc
+// }
+//
+// /////////////////////////////////////////////////////////////////////
+// //
+// // Bits that deal with assigning message ids to QoS > 0 requests
+// // and tracking those requests so that they can get replayed on a
+// // reconnect.
+// //
+// /////////////////////////////////////////////////////////////////////
+//
+// var in_flight_publishes = HashMap[Short, Request]()
+//
+// def send(message: MessageSupport.Message): Unit = {
+// queue.assertExecuting()
+// handler.foreach(_.connection_sink.offer(Request(0, message, null)))
+// }
+//
+// def publish_completed(id: Short): Unit = {
+// queue.assertExecuting()
+// in_flight_publishes.remove(id) match {
+// case Some(request) =>
+// if ( request.ack != null ) {
+// request.ack(Consumed)
+// }
+// case None =>
+// // It's possible that on a reconnect, we get an ACK
+// // in for message that was not dispatched yet. store
+// // a place holder so we ack it upon the dispatch
+// // attempt.
+// in_flight_publishes.put(id, Request(id, null, null))
+// }
+// }
+//
+// /////////////////////////////////////////////////////////////////////
+// //
+// // Bits that deal with processing new messages from the client.
+// //
+// /////////////////////////////////////////////////////////////////////
+// def on_transport_command(command:AnyRef):Unit = command match {
+// case command:MQTTFrame=>
+//
+// command.messageType() match {
+//
+// case PUBLISH.TYPE =>
+// on_mqtt_publish(received(new PUBLISH().decode(command)))
+//
+// // This follows a Publish with QoS EXACTLY_ONCE
+// case PUBREL.TYPE =>
+// var ack = received(new PUBREL().decode(command))
+// // TODO: perhaps persist the processed list.. otherwise
+// // we can't filter out dups after a broker restart.
+// session_state.received_message_ids.remove(ack.messageId)
+// session_state.strategy.update {
+// send(new PUBCOMP().messageId(ack.messageId))
+// }
+//
+// case SUBSCRIBE.TYPE =>
+// on_mqtt_subscribe(received(new SUBSCRIBE().decode(command)))
+//
+// case UNSUBSCRIBE.TYPE =>
+// on_mqtt_unsubscribe(received(new UNSUBSCRIBE().decode(command)))
+//
+// // AT_LEAST_ONCE ack flow for a client subscription
+// case PUBACK.TYPE =>
+// val ack = received(new PUBACK().decode(command))
+// publish_completed(ack.messageId)
+//
+// // EXACTLY_ONCE ack flow for a client subscription
+// case PUBREC.TYPE =>
+// val ack = received(new PUBREC().decode(command))
+// send(new PUBREL().messageId(ack.messageId))
+//
+// case PUBCOMP.TYPE =>
+// val ack: PUBCOMP = received(new PUBCOMP().decode(command))
+// publish_completed(ack.messageId)
+//
+// case PINGREQ.TYPE =>
+// received(new PINGREQ().decode(command))
+// send(new PINGRESP())
+//
+// case DISCONNECT.TYPE =>
+// received(new DISCONNECT())
+// MqttSessionManager.disconnect(host_state, client_id, handler.get)
+//
+// case _ =>
+// handler.get.die("Invalid MQTT message type: "+command.messageType());
+// }
+// case "failure" =>
+// // Publish the client's will
+// publish_will {
+// // then disconnect him.
+// MqttSessionManager.disconnect(host_state, client_id, handler.get)
+// }
+//
+// case _=>
+// handler.get.die("Internal Server Error: unexpected mqtt command: \
"+command.getClass); +// }
+//
+// /////////////////////////////////////////////////////////////////////
+// //
+// // Bits that deal with processing PUBLISH messages
+// //
+// /////////////////////////////////////////////////////////////////////
+// var producerRoutes = new LRUCache[UTF8Buffer, MqttProducerRoute](10) {
+// override def onCacheEviction(eldest: Entry[UTF8Buffer, MqttProducerRoute]) = {
+// host.dispatch_queue {
+// host.router.disconnect(Array(eldest.getValue.address), eldest.getValue)
+// }
+// }
+// }
+// case class MqttProducerRoute(address:SimpleAddress, handler:MqttProtocolHandler) \
extends DeliveryProducerRoute(host.router) { +// override def send_buffer_size = \
handler.codec.getReadBufferSize +// override def connection = \
Some(handler.connection) +// override def dispatch_queue = queue
+//
+// var suspended = false
+//
+// refiller = ^ {
+// if( suspended ) {
+// suspended = false
+// handler.resume_read
+// }
+// }
+//
+// }
+//
+// def on_mqtt_publish(publish:PUBLISH):Unit = {
+//
+// if( (publish.qos eq EXACTLY_ONCE) && \
session_state.received_message_ids.contains(publish.messageId)) { +// val \
response = new PUBREC +// response.messageId(publish.messageId)
+// send(response)
+// return
+// }
+//
+// handler.get.messages_received += 1
+//
+// queue.assertExecuting()
+// producerRoutes.get(publish.topicName()) match {
+// case null =>
+// // create the producer route...
+//
+// val destination = decode_destination(publish.topicName())
+// val route = MqttProducerRoute(destination, handler.get)
+//
+// // don't process commands until producer is connected...
+// route.handler.suspend_read("route publish lookup")
+// host.dispatch_queue {
+// host.router.connect(Array(destination), route, security_context)
+// queue {
+// // We don't care if we are not allowed to send..
+// if (!route.handler.connection.stopped) {
+// route.handler.resume_read
+// producerRoutes.put(publish.topicName(), route)
+// send_via_route(route, publish)
+// }
+// }
+// }
+//
+// case route =>
+// // we can re-use the existing producer route
+// send_via_route(route, publish)
+// }
+// }
+//
+// def send_via_route(route:MqttProducerRoute, publish:PUBLISH):Unit = {
+// queue.assertExecuting()
+//
+// def at_least_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
+// val response = new PUBACK
+// response.messageId(publish.messageId)
+// send(response)
+// }
+//
+// def exactly_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
+// queue.assertExecuting()
+// // TODO: perhaps persist the processed list..
+// session_state.received_message_ids.add(publish.messageId)
+// session_state.strategy.update {
+// val response = new PUBREC
+// response.messageId(publish.messageId)
+// send(response)
+// }
+// }
+//
+// val ack = publish.qos match {
+// case AT_LEAST_ONCE => at_least_once_ack _
+// case EXACTLY_ONCE => exactly_once_ack _
+// case AT_MOST_ONCE => null
+// }
+//
+// if( !route.targets.isEmpty ) {
+// val delivery = new Delivery
+// delivery.message = RawMessage(publish.payload)
+// delivery.persistent = publish.qos().ordinal() > 0
+// delivery.size = publish.payload.length
+// delivery.ack = ack
+// if( publish.retain() ) {
+// if( delivery.size == 0 ) {
+// delivery.retain = RetainRemove
+// } else {
+// delivery.retain = RetainSet
+// }
+// }
+//
+// // routes can always accept at least 1 delivery...
+// assert( !route.full )
+// route.offer(delivery)
+// if( route.full ) {
+// // but once it gets full.. suspend to flow control the producer.
+// route.suspended = true
+// handler.get.suspend_read("blocked sending to: \
"+route.overflowSessions.mkString(", ")) +// }
+//
+// } else {
+// ack(null, null)
+// }
+// }
+//
+//
+// //
+// def publish_will(complete_close: =>Unit) = {
+// if(connect_message!=null) {
+// if( connect_message.willTopic()==null ) {
+// complete_close
+// } else {
+//
+// val destination = decode_destination(connect_message.willTopic())
+// val prodcuer = new DeliveryProducerRoute(host.router) {
+// override def send_buffer_size = 1024*64
+// override def connection = handler.map(_.connection)
+// override def dispatch_queue = queue
+// refiller = NOOP
+// }
+//
+// host.dispatch_queue {
+// host.router.connect(Array(destination), prodcuer, security_context)
+// queue {
+// if(prodcuer.targets.isEmpty) {
+// complete_close
+// } else {
+// val delivery = new Delivery
+// delivery.message = RawMessage(connect_message.willMessage())
+// delivery.size = connect_message.willMessage().length
+// delivery.persistent = connect_message.willQos().ordinal() > 0
+// if( connect_message.willRetain() ) {
+// if( delivery.size == 0 ) {
+// delivery.retain = RetainRemove
+// } else {
+// delivery.retain = RetainSet
+// }
+// }
+//
+// delivery.ack = (x,y) => {
+// host.dispatch_queue {
+// host.router.disconnect(Array(destination), prodcuer)
+// }
+// complete_close
+// }
+// handler.get.messages_received += 1
+// prodcuer.offer(delivery)
+// }
+// }
+// }
+// }
+// }
+// }
+// /////////////////////////////////////////////////////////////////////
+// //
+// // Bits that deal with subscriptions
+// //
+// /////////////////////////////////////////////////////////////////////
+//
+// def on_mqtt_subscribe(sub:SUBSCRIBE):Unit = {
+// subscribe(sub.topics()) {
+// queue {
+// session_state.strategy.update {
+// val suback = new SUBACK
+// suback.messageId(sub.messageId())
+// suback.grantedQos(sub.topics().map(_.qos().ordinal().toByte))
+// send(suback)
+// }
+// }
+// }
+// }
+//
+// def subscribe(topics:Traversable[Topic])(on_subscribed: => Unit):Unit = {
+// var addresses:Array[_ <: BindAddress] = topics.toArray.map { topic =>
+// var address:BindAddress = decode_destination(topic.name)
+// session_state.subscriptions += topic.name -> (topic, address)
+// mqtt_consumer.addresses += address -> topic.qos
+// if(PathParser.containsWildCards(address.path)) {
+// mqtt_consumer.wildcards.put( address.path, topic.qos() )
+// }
+// address
+// }
+//
+// handler.get.subscription_count = mqtt_consumer.addresses.size
+//
+// addresses = if( clean_session ) {
+// addresses
+// } else {
+// session_state.durable_sub = SubscriptionAddress(Path(client_id.toString), \
null, mqtt_consumer.addresses.keySet.toArray) +// \
Array(session_state.durable_sub) +// }
+//
+// host.dispatch_queue {
+// addresses.foreach { address=>
+// host.router.bind(Array[BindAddress](address), mqtt_consumer, \
security_context) { result => +// // MQTT ignores subscribe failures.
+// }
+// }
+// on_subscribed
+// }
+//
+// }
+//
+// def on_mqtt_unsubscribe(unsubscribe:UNSUBSCRIBE):Unit = {
+//
+// val addresses:Array[_ <: BindAddress] = unsubscribe.topics.flatMap { topic =>
+// session_state.subscriptions.remove(topic).map { case (topic, address)=>
+// mqtt_consumer.addresses.remove(address)
+// if(PathParser.containsWildCards(address.path)) {
+// mqtt_consumer.wildcards.remove(address.path, topic.qos)
+// }
+// address
+// }
+// }
+//
+// handler.get.subscription_count = mqtt_consumer.addresses.size
+//
+// if(!clean_session) {
+// session_state.durable_sub = SubscriptionAddress(Path(client_id.toString), \
null, mqtt_consumer.addresses.keySet.toArray) +// }
+//
+// host.dispatch_queue {
+// if(clean_session) {
+// host.router.unbind(addresses, mqtt_consumer, false, security_context)
+// } else {
+// if( mqtt_consumer.addresses.isEmpty ) {
+// host.router.unbind(Array(session_state.durable_sub), mqtt_consumer, \
true, security_context) +// session_state.durable_sub = null
+// } else {
+// host.router.bind(Array(session_state.durable_sub), mqtt_consumer, \
security_context) { result => +// }
+// }
+// }
+// queue {
+// session_state.strategy.update {
+// val ack = new UNSUBACK
+// ack.messageId(unsubscribe.messageId())
+// send(ack)
+// }
+// }
+// }
+//
+// }
+//
+// var publish_body = false
+//
+// lazy val mqtt_consumer = new MqttConsumer
+// class MqttConsumer extends BaseRetained with DeliveryConsumer {
+//
+// override def toString = "mqtt client:"+client_id+" remote address: \
"+security_context.remote_address +//
+// val addresses = HashMap[BindAddress, QoS]()
+// val wildcards = new PathMap[QoS]()
+//
+// val credit_window_source = createSource(new EventAggregator[(Int, Int), (Int, \
Int)] { +// def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
+// if( previous == null ) {
+// event
+// } else {
+// (previous._1+event._1, previous._2+event._2)
+// }
+// }
+// def mergeEvents(previous:(Int, Int), events:(Int, Int)) = \
mergeEvent(previous, events) +// }, dispatch_queue)
+//
+// credit_window_source.setEventHandler(^{
+// val data = credit_window_source.getData
+// credit_window_filter.credit(data._1, data._2)
+// });
+// credit_window_source.resume
+//
+// val consumer_sink = new MutableSink[Request]()
+// consumer_sink.downstream = None
+//
+// var next_seq_id = 1L
+// def get_next_seq_id = {
+// val rc = next_seq_id
+// next_seq_id += 1
+// rc
+// }
+//
+// def to_message_id(value:Long):Short = (
+// 0x8000 | // MQTT message ids cannot be zero, so we always set the highest \
bit. +// (value & 0x7FFF) // the lower 15 bits come for the original seq id.
+// ).toShort
+//
+// val credit_window_filter = new CreditWindowFilter[(Session[Delivery], \
Delivery)](consumer_sink.flatMap{ event => +// queue.assertExecuting()
+// val (session, delivery) = event
+//
+// session_manager.delivered(session, delivery.size)
+//
+// // Look up which QoS we need to send this message with..
+// var topic = delivery.sender.head.simple
+// import collection.JavaConversions._
+// addresses.get(topic).orElse(wildcards.get(topic.path).headOption) match {
+//
+// case None =>
+// // draining messages after an un-subscribe
+// acked(delivery, Consumed)
+// None
+//
+// case Some(qos) =>
+//
+// // Convert the Delivery into a Request
+// var publish = new PUBLISH
+// publish.topicName(new \
UTF8Buffer(destination_parser.encode_destination(Array(delivery.sender.head)))) +// \
if( delivery.redeliveries > 0) { +// publish.dup(true)
+// }
+//
+// if( delivery.message.codec eq RawMessageCodec ) {
+// publish.payload(delivery.message.asInstanceOf[RawMessage].payload)
+// } else {
+// if( publish_body ) {
+// publish.payload(delivery.message.getBodyAs(classOf[Buffer]))
+// } else {
+// publish.payload(delivery.message.encoded)
+// }
+// }
+//
+// handler.get.messages_sent += 1
+//
+// if (delivery.ack!=null && (qos ne AT_MOST_ONCE)) {
+// publish.qos(qos)
+// val id = to_message_id(if(clean_session) {
+// get_next_seq_id // generate our own seq id.
+// } else {
+// delivery.seq // use the durable sub's seq id..
+// })
+//
+// publish.messageId(id)
+// val request = Request(id, publish, (result)=>{acked(delivery, \
result)}) +// in_flight_publishes.put(id, request) match {
+// case Some(r) =>
+// // A reconnecting client could have acked before
+// // we get dispatched by the durable sub.
+// if( r.message == null ) {
+// in_flight_publishes.remove(id)
+// acked(delivery, Consumed)
+// } else {
+// // Looks we sent out a msg with that id. This could only
+// // happen once we send out 0x7FFF message and the first
+// // one has not been acked.
+// handler.foreach(_.async_die("Client not acking regularly.", \
null)) +// }
+// case None =>
+// }
+//
+// Some(request)
+//
+// } else {
+// // This callback gets executed once the message
+// // sent to the transport.
+// publish.qos(AT_MOST_ONCE)
+// Some(Request(0, publish, (result)=>{ acked(delivery, result) }))
+// }
+// }
+//
+// }, SessionDeliverySizer)
+//
+// def acked(delivery:Delivery, result:DeliveryResult) = {
+// queue.assertExecuting()
+// credit_window_source.merge((delivery.size, 1))
+// if( delivery.ack!=null ) {
+// delivery.ack(result, null)
+// }
+// }
+//
+// credit_window_filter.credit(handler.get.codec.getWriteBufferSize*2, 1)
+//
+// val session_manager:SessionSinkMux[Delivery] = new \
SessionSinkMux[Delivery](credit_window_filter, queue, Delivery, Integer.MAX_VALUE/2, \
receive_buffer_size) { +// override def time_stamp = host.broker.now
+// }
+//
+// override def dispose() = queue {
+// super.dispose()
+// }
+//
+// def dispatch_queue = queue
+// override def connection = handler.map(_.connection)
+// override def receive_buffer_size = 1024*64; // \
handler.codec.getWriteBufferSize +// def is_persistent = false
+// def matches(delivery:Delivery):Boolean = true
+//
+// //
+// // Each destination we subscribe to will establish a session with us.
+// //
+// class MqttConsumerSession(val producer:DeliveryProducer) extends \
DeliverySession with SessionSinkFilter[Delivery] { +// \
producer.dispatch_queue.assertExecuting() +// retain
+//
+// val downstream = session_manager.open(producer.dispatch_queue)
+//
+// override def toString = "connection to \
"+handler.map(_.connection.transport.getRemoteAddress).getOrElse("unconnected") +//
+// def consumer = mqtt_consumer
+// var closed = false
+//
+// def close = {
+// assert(producer.dispatch_queue.isExecuting)
+// if( !closed ) {
+// closed = true
+// dispose
+// }
+// }
+//
+// def dispose = {
+// session_manager.close(downstream, (delivery)=>{
+// // We have been closed so we have to nak any deliveries.
+// if( delivery.ack!=null ) {
+// delivery.ack(Undelivered, delivery.uow)
+// }
+// })
+// release
+// }
+//
+// // Delegate all the flow control stuff to the session
+// override def full = {
+// val rc = super.full
+// rc
+// }
+//
+// def offer(delivery:Delivery) = {
+// if( full ) {
+// false
+// } else {
+// delivery.message.retain()
+// val rc = downstream.offer(delivery)
+// assert(rc, "offer should be accepted since it was not full")
+// true
+// }
+// }
+//
+// }
+// def connect(p:DeliveryProducer) = new MqttConsumerSession(p)
+// }
+//
+//}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic