[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: [1/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6339
From: tabish () apache ! org
Date: 2016-06-30 19:14:21
Message-ID: 6d591b2060384256a02dcd12e7b8965e () git ! apache ! org
[Download RAW message or body]
Repository: activemq
Updated Branches:
refs/heads/master 83827f277 -> 31c55f751
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java \
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java \
index 4110fcb..3029668 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
@@ -23,6 +23,8 @@ import java.util.Map;
import javax.servlet.Servlet;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.transport.SocketConnectorFactory;
import org.apache.activemq.transport.WebTransportServerSupport;
@@ -41,10 +43,13 @@ import org.slf4j.LoggerFactory;
* Creates a web server and registers web socket server
*
*/
-public class WSTransportServer extends WebTransportServerSupport {
+public class WSTransportServer extends WebTransportServerSupport implements \
BrokerServiceAware {
private static final Logger LOG = \
LoggerFactory.getLogger(WSTransportServer.class);
+ private BrokerService brokerService;
+ private WSServlet servlet;
+
public WSTransportServer(URI location) {
super(location);
this.bindAddress = location;
@@ -105,8 +110,10 @@ public class WSTransportServer extends WebTransportServerSupport \
{ }
private Servlet createWSServlet() throws Exception {
- WSServlet servlet = new WSServlet();
+ servlet = new WSServlet();
servlet.setTransportOptions(transportOptions);
+ servlet.setBrokerService(brokerService);
+
return servlet;
}
@@ -147,4 +154,12 @@ public class WSTransportServer extends WebTransportServerSupport \
{ public boolean isSslServer() {
return false;
}
+
+ @Override
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ if (servlet != null) {
+ servlet.setBrokerService(brokerService);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java \
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java \
index 338be98..1f7c5e7 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
@@ -18,17 +18,26 @@
package org.apache.activemq.transport.ws.jetty9;
import java.io.IOException;
-import java.util.*;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.activemq.jms.pool.IntrospectionSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.util.HttpTransportUtils;
+import org.apache.activemq.transport.ws.WSTransportProxy;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@@ -39,16 +48,21 @@ import \
org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; /**
* Handle connection upgrade requests and creates web sockets
*/
-public class WSServlet extends WebSocketServlet {
+public class WSServlet extends WebSocketServlet implements BrokerServiceAware {
private static final long serialVersionUID = -4716657876092884139L;
private TransportAcceptListener listener;
- private final static Map<String, Integer> stompProtocols = new \
ConcurrentHashMap<> ();
- private final static Map<String, Integer> mqttProtocols = new \
ConcurrentHashMap<> (); + private final static Map<String, Integer> stompProtocols \
= new ConcurrentHashMap<>(); + private final static Map<String, Integer> \
mqttProtocols = new ConcurrentHashMap<>();
private Map<String, Object> transportOptions;
+ private BrokerService brokerService;
+
+ private enum Protocol {
+ MQTT, STOMP, UNKNOWN
+ }
static {
stompProtocols.put("v12.stomp", 3);
@@ -80,41 +94,98 @@ public class WSServlet extends WebSocketServlet {
@Override
public Object createWebSocket(ServletUpgradeRequest req, \
ServletUpgradeResponse resp) { WebSocketListener socket;
- boolean isMqtt = false;
- for (String subProtocol : req.getSubProtocols()) {
- if (subProtocol.startsWith("mqtt")) {
- isMqtt = true;
+ Protocol requestedProtocol = Protocol.UNKNOWN;
+
+ // When no sub-protocol is requested we default to STOMP for legacy \
reasons. + if (!req.getSubProtocols().isEmpty()) {
+ for (String subProtocol : req.getSubProtocols()) {
+ if (subProtocol.startsWith("mqtt")) {
+ requestedProtocol = Protocol.MQTT;
+ } else if (subProtocol.contains("stomp")) {
+ requestedProtocol = Protocol.STOMP;
+ }
}
- }
- if (isMqtt) {
- socket = new \
MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
- \
resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols,req.getSubProtocols(), \
"mqtt"));
- ((MQTTSocket)socket).setTransportOptions(new \
HashMap(transportOptions));
- ((MQTTSocket)socket).setPeerCertificates(req.getCertificates());
} else {
- socket = new \
StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
- ((StompSocket)socket).setCertificates(req.getCertificates());
- \
resp.setAcceptedSubProtocol(getAcceptedSubProtocol(stompProtocols,req.getSubProtocols(), \
"stomp")); + requestedProtocol = Protocol.STOMP;
+ }
+
+ switch (requestedProtocol) {
+ case MQTT:
+ socket = new \
MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); \
+ ((MQTTSocket) socket).setTransportOptions(new \
HashMap<String, Object>(transportOptions)); + ((MQTTSocket) \
socket).setPeerCertificates(req.getCertificates()); + \
resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols, \
req.getSubProtocols(), "mqtt")); + break;
+ case UNKNOWN:
+ socket = findWSTransport(req, resp);
+ if (socket != null) {
+ break;
+ }
+ case STOMP:
+ socket = new \
StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); \
+ ((StompSocket) \
socket).setPeerCertificates(req.getCertificates()); + \
resp.setAcceptedSubProtocol(getAcceptedSubProtocol(stompProtocols, \
req.getSubProtocols(), "stomp")); + break;
+ default:
+ socket = null;
+ listener.onAcceptError(new IOException("Unknown protocol \
requested")); + break;
+ }
+
+ if (socket != null) {
+ listener.onAccept((Transport) socket);
}
- listener.onAccept((Transport) socket);
+
return socket;
}
});
}
- private String getAcceptedSubProtocol(final Map<String, Integer> protocols,
- List<String> subProtocols, String defaultProtocol) {
+ private WebSocketListener findWSTransport(ServletUpgradeRequest request, \
ServletUpgradeResponse response) { + WSTransportProxy proxy = null;
+
+ for (String subProtocol : request.getSubProtocols()) {
+ try {
+ String remoteAddress = \
HttpTransportUtils.generateWsRemoteAddress(request.getHttpServletRequest(), \
subProtocol); + URI remoteURI = new URI(remoteAddress);
+
+ TransportFactory factory = \
TransportFactory.findTransportFactory(remoteURI); +
+ if (factory instanceof BrokerServiceAware) {
+ ((BrokerServiceAware) factory).setBrokerService(brokerService);
+ }
+
+ Transport transport = factory.doConnect(remoteURI);
+
+ proxy = new WSTransportProxy(remoteAddress, transport);
+ proxy.setPeerCertificates(request.getCertificates());
+ proxy.setTransportOptions(transportOptions);
+
+ response.setAcceptedSubProtocol(proxy.getSubProtocol());
+ } catch (Exception e) {
+ proxy = null;
+
+ // Keep going and try any other sub-protocols present.
+ continue;
+ }
+ }
+
+ return proxy;
+ }
+
+ private String getAcceptedSubProtocol(final Map<String, Integer> protocols, \
List<String> subProtocols, String defaultProtocol) { List<SubProtocol> \
matchedProtocols = new ArrayList<>(); if (subProtocols != null && \
subProtocols.size() > 0) {
- //detect which subprotocols match accepted protocols and add to the list
+ // detect which subprotocols match accepted protocols and add to the
+ // list
for (String subProtocol : subProtocols) {
Integer priority = protocols.get(subProtocol);
- if(subProtocol != null && priority != null) {
- //only insert if both subProtocol and priority are not null
+ if (subProtocol != null && priority != null) {
+ // only insert if both subProtocol and priority are not null
matchedProtocols.add(new SubProtocol(subProtocol, priority));
}
}
- //sort the list by priority
+ // sort the list by priority
if (matchedProtocols.size() > 0) {
Collections.sort(matchedProtocols, new Comparator<SubProtocol>() {
@Override
@@ -131,6 +202,7 @@ public class WSServlet extends WebSocketServlet {
private class SubProtocol {
private String protocol;
private Integer priority;
+
public SubProtocol(String protocol, Integer priority) {
this.protocol = protocol;
this.priority = priority;
@@ -140,4 +212,9 @@ public class WSServlet extends WebSocketServlet {
public void setTransportOptions(Map<String, Object> transportOptions) {
this.transportOptions = transportOptions;
}
+
+ @Override
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java \
b/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java
index 34e8502..05a8159 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java
@@ -22,6 +22,8 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
@@ -32,7 +34,9 @@ import org.apache.activemq.util.URISupport;
/**
* Factory for Secure WebSocket (wss) transport
*/
-public class WSSTransportFactory extends TransportFactory {
+public class WSSTransportFactory extends TransportFactory implements \
BrokerServiceAware { +
+ private BrokerService brokerService;
@Override
public TransportServer doBind(URI location) throws IOException {
@@ -44,9 +48,15 @@ public class WSSTransportFactory extends TransportFactory {
IntrospectionSupport.setProperties(result, transportOptions);
result.setTransportOption(transportOptions);
result.setHttpOptions(httpOptions);
+ result.setBrokerService(brokerService);
return result;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
+
+ @Override
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java \
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java \
index fdbf867..c4e8c47 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
@@ -250,9 +250,6 @@ public class MQTTWSConnection extends WebSocketAdapter \
implements WebSocketListe }
}
- /* (non-Javadoc)
- * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketClose(int, \
java.lang.String)
- */
@Override
public void onWebSocketClose(int statusCode, String reason) {
LOG.trace("MQTT WS Connection closed, code:{} message:{}", statusCode, \
reason); @@ -263,14 +260,9 @@ public class MQTTWSConnection extends WebSocketAdapter \
implements WebSocketListe
}
- /* (non-Javadoc)
- * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketConnect(org.eclipse.jetty.websocket.api.Session)
- */
@Override
- public void onWebSocketConnect(
- org.eclipse.jetty.websocket.api.Session session) {
+ public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) \
{ this.connection = session;
this.connectLatch.countDown();
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java \
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java
index 83cbd69..844c661 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java
@@ -43,13 +43,9 @@ public class StompWSConnectionTimeoutTest extends \
WSTransportTestSupport { super.setUp();
wsStompConnection = new StompWSConnection();
-// WebSocketClientFactory clientFactory = new WebSocketClientFactory();
-// clientFactory.start();
wsClient = new WebSocketClient();
wsClient.start();
-
-
wsClient.connect(wsStompConnection, wsConnectUri);
if (!wsStompConnection.awaitConnection(30, TimeUnit.SECONDS)) {
throw new IOException("Could not connect to STOMP WS endpoint");
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java \
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
index edf7b6c..c83f24d 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
@@ -69,18 +69,16 @@ public class WSTransportTestSupport {
LOG.info("========== Finished test: {} ==========", name.getMethodName());
}
-// protected String getWSConnectorURI() {
-// return "ws://127.0.0.1:" + getProxyPort() +
-// "?allowLinkStealing=" + isAllowLinkStealing() +
-// "&websocket.maxTextMessageSize=99999&" +
-// "transport.maxIdleTime=1001";
-// }
+ protected String getWSConnectionURI() {
+ return "ws://127.0.0.1:" + getProxyPort();
+ }
protected String getWSConnectorURI() {
return "ws://127.0.0.1:" + getProxyPort() +
- "?allowLinkStealing=" + isAllowLinkStealing() +
- "&websocket.maxTextMessageSize=99999&" +
- "transport.idleTimeout=1001";
+ "?allowLinkStealing=" + isAllowLinkStealing() +
+ "&websocket.maxTextMessageSize=99999" +
+ "&transport.idleTimeout=1001" +
+ "&trace=true&transport.trace=true";
}
protected boolean isAllowLinkStealing() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/resources/log4j.properties \
b/activemq-http/src/test/resources/log4j.properties index aa64270..2aabf97 100755
--- a/activemq-http/src/test/resources/log4j.properties
+++ b/activemq-http/src/test/resources/log4j.properties
@@ -20,8 +20,10 @@
#
log4j.rootLogger=INFO, out, stdout
-log4j.logger.org.apache.activemq.transport.ws=DEBUG
+log4j.logger.org.apache.activemq.transport.ws=TRACE
log4j.logger.org.apache.activemq.transport.http=DEBUG
+log4j.logger.org.apache.activemq.transport.amqp=TRACE
+log4j.logger.org.apache.activemq.transport.amqp.FRAMES=TRACE
#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index ef3f003..a5f114e 100755
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -64,10 +64,6 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>activemq-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
<artifactId>activemq-partition</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java \
b/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java
deleted file mode 100644
index c4a2fd7..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.conversions;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.qpid.jms.JmsConnectionFactory;
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.QoS;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AmqpAndMqttTest {
-
- protected BrokerService broker;
- private TransportConnector amqpConnector;
- private TransportConnector mqttConnector;
-
- @Before
- public void setUp() throws Exception {
- broker = createBroker();
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- broker = null;
- }
- }
-
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = new BrokerService();
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.setAdvisorySupport(false);
- broker.setSchedulerSupport(false);
-
- amqpConnector = broker.addConnector("amqp://0.0.0.0:0");
- mqttConnector = broker.addConnector("mqtt://0.0.0.0:0");
-
- return broker;
- }
-
- @Test(timeout = 60000)
- public void testFromMqttToAmqp() throws Exception {
- Connection amqp = createAmqpConnection();
- Session session = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = \
session.createConsumer(session.createTopic("topic://FOO"));
-
- final BlockingConnection mqtt = createMQTTConnection().blockingConnection();
- mqtt.connect();
- byte[] payload = bytes("Hello World");
- mqtt.publish("FOO", payload, QoS.AT_LEAST_ONCE, false);
- mqtt.disconnect();
-
- Message msg = consumer.receive(1000 * 5);
- assertNotNull(msg);
- assertTrue(msg instanceof BytesMessage);
-
- BytesMessage bmsg = (BytesMessage) msg;
- byte[] actual = new byte[(int) bmsg.getBodyLength()];
- bmsg.readBytes(actual);
- assertTrue(Arrays.equals(actual, payload));
- amqp.close();
- }
-
- private byte[] bytes(String value) {
- try {
- return value.getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
-
- protected MQTT createMQTTConnection() throws Exception {
- MQTT mqtt = new MQTT();
- mqtt.setConnectAttemptsMax(1);
- mqtt.setReconnectAttemptsMax(0);
- mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
- return mqtt;
- }
-
- public Connection createAmqpConnection() throws Exception {
-
- String amqpURI = "amqp://localhost:" + \
amqpConnector.getConnectUri().getPort();
-
- final JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI);
-
- factory.setUsername("admin");
- factory.setPassword("password");
-
- final Connection connection = factory.createConnection();
- connection.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException exception) {
- exception.printStackTrace();
- }
- });
- connection.start();
- return connection;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java \
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java \
index 8fb70ec..24a1dcd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java
@@ -17,26 +17,27 @@
package org.apache.activemq.transport;
import java.io.IOException;
+import java.security.cert.X509Certificate;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
-/**
- *
- *
- */
public class StubTransport extends TransportSupport {
private Queue<Object> queue = new ConcurrentLinkedQueue<Object>();
private volatile int receiveCounter;
+ @Override
protected void doStop(ServiceStopper stopper) throws Exception {
}
+ @Override
protected void doStart() throws Exception {
}
+ @Override
public void oneway(Object command) throws IOException {
receiveCounter++;
queue.add(command);
@@ -46,12 +47,28 @@ public class StubTransport extends TransportSupport {
return queue;
}
+ @Override
public String getRemoteAddress() {
return null;
}
+ @Override
public int getReceiveCounter() {
return receiveCounter;
}
+ @Override
+ public X509Certificate[] getPeerCertificates() {
+ return null;
+ }
+
+ @Override
+ public void setPeerCertificates(X509Certificate[] certificates) {
+
+ }
+
+ @Override
+ public WireFormat getWireFormat() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java \
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java
index 9054e1a..82d87ba 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java
@@ -35,7 +35,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-
@RunWith(Parameterized.class)
public class AutoTransportConfigureTest {
@@ -49,12 +48,7 @@ public class AutoTransportConfigureTest {
@Parameters
public static Iterable<Object[]> parameters() {
- return Arrays.asList(new Object[][]{
- {"auto"},
- {"auto+nio"},
- {"auto+ssl"},
- {"auto+nio+ssl"}
- });
+ return Arrays.asList(new Object[][] { { "auto" }, { "auto+nio" }, { \
"auto+ssl" }, { "auto+nio+ssl" } }); }
private String transportType;
@@ -76,7 +70,7 @@ public class AutoTransportConfigureTest {
}
@After
- public void tearDown() throws Exception{
+ public void tearDown() throws Exception {
if (this.brokerService != null) {
this.brokerService.stop();
this.brokerService.waitUntilStopped();
@@ -92,7 +86,7 @@ public class AutoTransportConfigureTest {
}
- @Test(expected=JMSException.class)
+ @Test(expected = JMSException.class)
public void testUrlConfiguration() throws Exception {
createBroker(transportType + "://localhost:0?wireFormat.maxFrameSize=10");
@@ -100,7 +94,7 @@ public class AutoTransportConfigureTest {
sendMessage(factory.createConnection());
}
- @Test(expected=JMSException.class)
+ @Test(expected = JMSException.class)
public void testUrlConfigurationOpenWireFail() throws Exception {
createBroker(transportType + \
"://localhost:0?wireFormat.default.maxFrameSize=10");
@@ -110,17 +104,17 @@ public class AutoTransportConfigureTest {
@Test
public void testUrlConfigurationOpenWireSuccess() throws Exception {
- //Will work because max frame size only applies to amqp
- createBroker(transportType + \
"://localhost:0?wireFormat.amqp.maxFrameSize=10"); + // Will work because max \
frame size only applies to stomp + createBroker(transportType + \
"://localhost:0?wireFormat.stomp.maxFrameSize=10");
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
sendMessage(factory.createConnection());
}
- @Test(expected=JMSException.class)
+ @Test(expected = JMSException.class)
public void testUrlConfigurationOpenWireNotAvailable() throws Exception {
- //only amqp is available so should fail
- createBroker(transportType + "://localhost:0?auto.protocols=amqp");
+ // only stomp is available so should fail
+ createBroker(transportType + "://localhost:0?auto.protocols=stomp");
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
sendMessage(factory.createConnection());
@@ -128,7 +122,7 @@ public class AutoTransportConfigureTest {
@Test
public void testUrlConfigurationOpenWireAvailable() throws Exception {
- //only open wire is available
+ // only open wire is available
createBroker(transportType + "://localhost:0?auto.protocols=default");
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
@@ -137,13 +131,12 @@ public class AutoTransportConfigureTest {
@Test
public void testUrlConfigurationOpenWireAndAmqpAvailable() throws Exception {
- createBroker(transportType + "://localhost:0?auto.protocols=default,amqp");
+ createBroker(transportType + "://localhost:0?auto.protocols=default,stomp");
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
sendMessage(factory.createConnection());
}
-
protected void sendMessage(Connection connection) throws JMSException {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -152,5 +145,4 @@ public class AutoTransportConfigureTest {
message.setText("this is a test");
producer.send(message);
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cf24896..f3c47ae 100755
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@
<zookeeper-version>3.4.6</zookeeper-version>
<qpid-proton-version>0.13.0</qpid-proton-version>
<qpid-jms-version>0.9.0</qpid-jms-version>
- <netty-all-version>4.0.33.Final</netty-all-version>
+ <netty-all-version>4.0.37.Final</netty-all-version>
<regexp-version>1.3</regexp-version>
<rome-version>1.0</rome-version>
<saxon-version>9.5.1-5</saxon-version>
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic