[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-users
Subject:    =?UTF-8?Q?Re=3A_Artemis_2=2E21=2B_=2D_MQTT_devices_don=E2=80=99t_receive_the?= =?UTF-8?Q?_messages_m
From:       Justin Bertram <jbertram () apache ! org>
Date:       2022-05-02 18:37:27
Message-ID: CAF+kE=TicFYa00e-AV5OLpocN15Jw_+EjYoiA0o=w5vb=dxmQA () mail ! gmail ! com
[Download RAW message or body]


> Now the $EDC/# is not working due to ARTEMIS-3801 right?

I believe that is the case, yes.


Justin

On Fri, Apr 29, 2022 at 2:40 AM Modanese, Riccardo
<Riccardo.Modanese@eurotech.com.invalid> wrote:

> From my understanding our previous subscription
> ?#?
> Should be changed into 2 different subscriptions:
> ?#?
> ?$EDC/#?
> 
> Now the $EDC/# is not working due to ARTEMIS-3801 right?
> If so I think it?s not a big problem we should manage 2 different
> subscriptions instead of one.
> 
> Thanks
> 
> 
> Da: Justin Bertram <jbertram@apache.org>
> Data: marted?, 26 aprile 2022 22:24
> A: users@activemq.apache.org <users@activemq.apache.org>
> Oggetto: Re: Artemis 2.21+ - MQTT devices don?t receive the messages
> matching their subscriptions
> I believe I see why this is working for you in ActiveMQ "Classic."
> 
> By default ActiveMQ "Classic" will block any messages published to an MQTT
> topic starting with '$' [1]. However, a configuration element named
> "publishDollarTopics"
> was added [2] a long time ago to assist "legacy MQTT applications"
> (presumably those migrating from 3.1 to 3.1.1 since the 3.1 spec makes no
> mention of topics starting with '$'). You are actually setting this
> parameter in Kapua [3].
> 
> A similar parameter could potentially be added to ActiveMQ Artemis, but
> it's been 8 years since MQTT 3.1.1 was released and 3 years since MQTT 5
> was released. Any application written against MQTT 3.1 has had ample time
> to be updated, and it's not like this is a gray area in the spec. Both
> 3.1.1 and 5 specifications are very clear about how to treat topics that
> start with '$'.
> 
> Is it reasonable to expect that Kapua would change to be spec compliant?
> 
> 
> Justin
> 
> [1]
> 
> https://github.com/apache/activemq/blob/59dfbc3302c43054d24a96e699b0eaf1b6d1bf98/act \
> ivemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java#L159
>  [2] https://issues.apache.org/jira/browse/AMQ-5292
> [3]
> 
> https://github.com/eclipse/kapua/blob/master/org.eclipse.kapua.assembly/src/main/resources/conf/broker/activemq.xml#L434
>  
> On Tue, Apr 26, 2022 at 9:42 AM Justin Bertram <jbertram@apache.org>
> wrote:
> 
> > > It?s a huge problem for us since in Kapua we are doing # subscriptions
> > (for internal components) and using $ as topic prefix (also Kura is using
> > this in its namespace) and worked perfectly with ActiveMQ 5.x.
> > 
> > This use-case is clearly prohibited by the MQTT specification. It
> > specifically says:
> > 
> > A subscription to ?#? will not receive any messages published to a
> topic
> > beginning with a $
> > 
> > If this is working on ActiveMQ 5.x then ActiveMQ 5.x should be fixed to
> > enforce the MQTT specification appropriately.
> > 
> > > I should evaluate the impact but I?m not so sure we can move to Artemis
> > with this limitation.
> > 
> > I'm not sure I would categorize this as a "limitation" unless you believe
> > enforcing the specification is a limitation.
> > 
> > For what it's worth the MQTT specification also says:
> > 
> > Applications cannot use a topic with a leading $ character for their
> own
> > purposes
> > 
> > At this point I recommend you change your application to conform with the
> > specification.
> > 
> > 
> > Justin
> > 
> > On Tue, Apr 26, 2022 at 2:19 AM Modanese, Riccardo
> > <Riccardo.Modanese@eurotech.com.invalid> wrote:
> > 
> > > It?s a huge problem for us since in Kapua we are doing # subscriptions
> > > (for internal components) and using $ as topic prefix (also Kura is
> using
> > > this in its namespace) and worked perfectly with ActiveMQ 5.x.
> > > 
> > > I should evaluate the impact but I?m not so sure we can move to Artemis
> > > with this limitation.
> > > 
> > > Riccardo
> > > 
> > > Da: Justin Bertram <jbertram@apache.org>
> > > Data: marted?, 26 aprile 2022 00:18
> > > A: users@activemq.apache.org <users@activemq.apache.org>
> > > Oggetto: Re: Artemis 2.21+ - MQTT devices don?t receive the messages
> > > matching their subscriptions
> > > I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken.
> > > 
> > > However, it's worth noting that your code will still not work as it did
> > > before because the previous behavior violated the MQTT 3.1.1
> > > specification.
> > > As noted previously, both MQTT 3.1.1 and 5 specifications contain
> > > MQTT-4.7.2-1 which states:
> > > 
> > > > The Server MUST NOT match Topic Filters starting with a wildcard
> > > character (# or +) with Topic Names beginning with a $ character.
> > > 
> > > Previously your "test-client-admin" client was subscribing to `#` and
> > > receiving messages from topics beginning with `$`. This won't work
> > > anymore.
> > > 
> > > 
> > > Justin
> > > 
> > > [1] https://issues.apache.org/jira/browse/ARTEMIS-3801
> > > 
> > > On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo
> > > <Riccardo.Modanese@eurotech.com.invalid> wrote:
> > > 
> > > > 
> > > > Hello,
> > > > switching from Artemis broker 2.20 to 2.21 we experienced an issue
> > > > about message delivering.
> > > > It looks like MQTT devices don?t receive the messages matching their
> > > > subscriptions.
> > > > 
> > > > The test code (***) run with an Artemis broker 2.19 or 2.20 as target
> > > > (wildcard addresses modified as (**)) produces the correct output:
> > > > waiting for messages
> > > > Client: test-client-1 - Delivery completed:
> > > > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> > > > Client: test-client-admin - Message arrived on topic:
> > > > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> > > > Client: test-client-1 - Message arrived on topic:
> > > > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> > > > ===
> > > > Client: test-client-2 - Delivery completed:
> > > > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> > > > Client: test-client-2 - Message arrived on topic:
> > > > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> > > > Client: test-client-admin - Message arrived on topic:
> > > > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> > > > ===
> > > > Client: test-client-admin - Delivery completed:
> > > > $EDC/kapua-sys/test-client-1/MQTT/APPS
> > > > Client: test-client-admin - Message arrived on topic:
> > > > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> > > > Client: test-client-1 - Message arrived on topic:
> > > > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> > > > ?
> > > > 
> > > > With broker 2.21 or 2.22 (configuration changes described in (*) ) as
> > > > target the output is:
> > > > waiting for messages
> > > > ===
> > > > Client: test-client-1 - Delivery completed:
> > > > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> > > > ===
> > > > Client: test-client-2 - Delivery completed:
> > > > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> > > > ===
> > > > Client: test-client-admin - Delivery completed:
> > > > $EDC/kapua-sys/test-client-1/MQTT/APPS
> > > > ?
> > > > 
> > > > So the broker doesn?t send any message to the clients.
> > > > 
> > > > May be we missed to configure something needed by 2.21 versions
> onward?
> > > > 
> > > > Regards,
> > > > Riccardo Modanese
> > > > 
> > > > 
> > > > 
> > > > (*) The 2.21 and 2.22 default broker.xml configuration file has
> changed
> > > in
> > > > this way:
> > > > set the broker name (message-broker)
> > > > removed double connector bound to 1883 (the broker with the default
> > > > configuration crashed)
> > > > allow only MQTT protocol for connector bound to 1883 port
> > > > removed broadcast connector and configuration
> > > > added custom wildcard configuration (**)
> > > > 
> > > > (**) <wildcard-addresses>
> > > > <routing-enabled>true</routing-enabled>
> > > > <delimiter>/</delimiter>
> > > > <any-words>#</any-words>
> > > > <single-word>+</single-word>
> > > > </wildcard-addresses>
> > > > 
> > > > (***)
> > > > 
> > > > 
> > > 
> /*******************************************************************************
> > > > * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
> > > > *
> > > > * This program and the accompanying materials are made
> > > > * available under the terms of the Eclipse Public License 2.0
> > > > * which is available at https://www.eclipse.org/legal/epl-2.0/
> > > > *
> > > > * SPDX-License-Identifier: EPL-2.0
> > > > *
> > > > * Contributors:
> > > > *     Eurotech - initial API and implementation
> > > > 
> > > > 
> > > 
> *******************************************************************************/
> > > > package org.eclipse.kapua.qa.common;
> > > > 
> > > > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> > > > import org.eclipse.paho.client.mqttv3.MqttCallback;
> > > > import org.eclipse.paho.client.mqttv3.MqttClient;
> > > > import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> > > > import org.eclipse.paho.client.mqttv3.MqttException;
> > > > import org.eclipse.paho.client.mqttv3.MqttMessage;
> > > > import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> > > > import org.slf4j.Logger;
> > > > import org.slf4j.LoggerFactory;
> > > > 
> > > > public class TestMqttClient {
> > > > 
> > > > protected static Logger logger =
> > > > LoggerFactory.getLogger(TestMqttClient.class);
> > > > 
> > > > private static final String SERVER_URI = "tcp://localhost:1883";
> > > > private static final String CLIENT_ID_ADMIN = "test-client-admin";
> > > > private static final String CLIENT_ID_1 = "test-client-1";
> > > > private static final String CLIENT_ID_2 = "test-client-2";
> > > > private static final String USERNAME = "kapua-broker";
> > > > private static final String PASSWORD = "kapua-password";
> > > > private static final String USERNAME_ADMIN = "kapua-sys";
> > > > private static final String PASSWORD_ADMIN = "kapua-password";
> > > > 
> > > > private TestMqttClient() {
> > > > }
> > > > 
> > > > public static void main(String argv[]) throws MqttException {
> > > > MqttClient clientAdmin = new MqttClient(SERVER_URI,
> > > > CLIENT_ID_ADMIN, new MemoryPersistence());
> > > > MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1,
> new
> > > > MemoryPersistence());
> > > > MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2,
> new
> > > > MemoryPersistence());
> > > > clientAdmin.setCallback(new
> > > > TestMqttClientCallback(CLIENT_ID_ADMIN));
> > > > client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
> > > > client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
> > > > 
> > > > clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN,
> > > > PASSWORD_ADMIN));
> > > > client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
> > > > client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
> > > > System.out.println("waiting for messages");
> > > > client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
> > > > client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
> > > > clientAdmin.subscribe("#");
> > > > 
> > > > client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> "/MQTT/BIRTH",
> > > > new MqttMessage("test".getBytes()));
> > > > System.out.println("===");
> > > > client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 +
> "/MQTT/BIRTH",
> > > > new MqttMessage("test".getBytes()));
> > > > System.out.println("===");
> > > > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> > > > "/MQTT/APPS", new MqttMessage("test".getBytes()));
> > > > System.out.println("===");
> > > > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> > > > "/MQTT/APPS", new MqttMessage("test".getBytes()));
> > > > System.out.println("===");
> > > > 
> > > > client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC",
> > > new
> > > > MqttMessage("test".getBytes()));
> > > > System.out.println("===");
> > > > client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC",
> > > new
> > > > MqttMessage("test".getBytes()));
> > > > System.out.println("===");
> > > > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> > > "/MQTT/DC",
> > > > new MqttMessage("test".getBytes()));
> > > > System.out.println("===");
> > > > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 +
> > > "/MQTT/DC",
> > > > new MqttMessage("test".getBytes()));
> > > > System.out.println("===");
> > > > 
> > > > clientAdmin.disconnect();
> > > > client1.disconnect();
> > > > client2.disconnect();
> > > > }
> > > > 
> > > > private static MqttConnectOptions getMqttConnectOptions(String
> > > > username, String password) {
> > > > MqttConnectOptions options = new MqttConnectOptions();
> > > > options.setCleanSession(true);
> > > > options.setUserName(username);
> > > > options.setPassword(password.toCharArray());
> > > > return options;
> > > > }
> > > > }
> > > > 
> > > > class TestMqttClientCallback implements MqttCallback {
> > > > 
> > > > private String clientId;
> > > > 
> > > > TestMqttClientCallback(String clientId) {
> > > > this.clientId = clientId;
> > > > }
> > > > 
> > > > @Override
> > > > public void messageArrived(String topic, MqttMessage message)
> throws
> > > > Exception {
> > > > System.out.println("Client: " + clientId + " - Message arrived
> > > on
> > > > topic: " + topic + " - message: " + new String(message.getPayload()));
> > > > }
> > > > 
> > > > @Override
> > > > public void deliveryComplete(IMqttDeliveryToken token) {
> > > > System.out.println("Client: " + clientId + " - Delivery
> > > completed:
> > > > " + token.getTopics()[0]);
> > > > }
> > > > 
> > > > @Override
> > > > public void connectionLost(Throwable cause) {
> > > > System.out.println("Client: " + clientId + " - Connection
> lost:
> > > "
> > > > + cause.getMessage());
> > > > cause.printStackTrace();
> > > > }
> > > > }
> > > > 
> > > 
> > 
> 



[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic