[prev in list] [next in list] [prev in thread] [next in thread]
List: mina-commits
Subject: =?utf-8?q?=5Bmina-sshd=5D_branch_master_updated=3A_=5BSSHD-1070?= =?utf-8?q?=5D_Limit_the_amount_of_
From: gnodet () apache ! org
Date: 2020-09-22 6:34:13
Message-ID: 160075645341.9907.16421717128410382465 () gitbox ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push:
new 72d6c00 [SSHD-1070] Limit the amount of data that is kept in memory for \
forwa… (#166) 72d6c00 is described below
commit 72d6c0086d2e86060e82e39b531338473f5195d0
Author: Guillaume Nodet <gnodet@gmail.com>
AuthorDate: Tue Sep 22 08:29:22 2020 +0200
[SSHD-1070] Limit the amount of data that is kept in memory for forwa… (#166)
* Add a switch to choose between sync / async modes for the TcpipServerChannel
* Enable load tests
---
.../java/org/apache/sshd/common/io/IoSession.java | 19 ++++++
sshd-core/pom.xml | 3 -
.../apache/sshd/client/channel/ClientChannel.java | 11 +---
.../sshd/common/channel/SimpleIoOutputStream.java | 67 ++++++++++++++++++++
.../sshd/common/channel/StreamingChannel.java | 37 +++++++++++
.../apache/sshd/common/io/nio2/Nio2Session.java | 39 ++++++++++++
.../org/apache/sshd/core/CoreModuleProperties.java | 17 ++++++
.../sshd/server/forward/TcpipServerChannel.java | 71 +++++++++++++++++-----
.../src/test/java/org/apache/sshd/LoadTest.java | 2 -
9 files changed, 237 insertions(+), 29 deletions(-)
diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java \
b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java index \
20dbb46..f8de2b4 100644
--- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java
+++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java
@@ -93,4 +93,23 @@ public interface IoSession extends ConnectionEndpointsIndicator, \
PacketWriter, C
* @throws IOException If failed to shutdown the stream
*/
void shutdownOutputStream() throws IOException;
+
+ /**
+ * Suspend read operations on this session. May do nothing if not supported by \
the session implementation. + *
+ * If the session usage includes a graceful shutdown with messages being \
exchanged, the caller needs to + * take care of resuming reading the input in \
order to actually be able to carry on the conversation with + * the peer.
+ */
+ default void suspendRead() {
+ // Do nothing by default, but can be overriden by implementations
+ }
+
+ /**
+ * Resume read operations on this session. May do nothing if not supported by \
the session implementation. + */
+ default void resumeRead() {
+ // Do nothing by default, but can be overriden by implementations
+ }
+
}
diff --git a/sshd-core/pom.xml b/sshd-core/pom.xml
index 48bffcd..8b4db65 100644
--- a/sshd-core/pom.xml
+++ b/sshd-core/pom.xml
@@ -184,9 +184,6 @@
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
\
<reportsDirectory>${project.build.directory}/surefire-reports-nio2</reportsDirectory>
- <excludes>
- <exclude>**/*LoadTest.java</exclude>
- </excludes>
</configuration>
</plugin>
</plugins>
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java \
b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java index \
6bd15dc..7897ba7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java
@@ -31,6 +31,7 @@ import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.client.session.ClientSessionHolder;
import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.StreamingChannel;
import org.apache.sshd.common.io.IoInputStream;
import org.apache.sshd.common.io.IoOutputStream;
@@ -41,11 +42,7 @@ import org.apache.sshd.common.io.IoOutputStream;
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public interface ClientChannel extends Channel, ClientSessionHolder {
- enum Streaming {
- Async,
- Sync
- }
+public interface ClientChannel extends Channel, StreamingChannel, \
ClientSessionHolder {
@Override
default ClientSession getClientSession() {
@@ -57,10 +54,6 @@ public interface ClientChannel extends Channel, \
ClientSessionHolder {
*/
String getChannelType();
- Streaming getStreaming();
-
- void setStreaming(Streaming streaming);
-
IoOutputStream getAsyncIn();
IoInputStream getAsyncOut();
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java \
new file mode 100644 index 0000000..6fee66a
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+import java.io.IOException;
+
+import org.apache.sshd.common.io.AbstractIoWriteFuture;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.closeable.AbstractCloseable;
+import org.apache.sshd.common.util.io.IoUtils;
+import org.apache.sshd.server.forward.TcpipServerChannel;
+
+/**
+ * An implementation of {@link IoOutputStream} using a synchronous {@link \
ChannelOutputStream}. + *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class SimpleIoOutputStream extends AbstractCloseable implements \
IoOutputStream { +
+ protected final ChannelOutputStream os;
+
+ public SimpleIoOutputStream(ChannelOutputStream os) {
+ this.os = os;
+ }
+
+ @Override
+ protected void doCloseImmediately() {
+ IoUtils.closeQuietly(os);
+ super.doCloseImmediately();
+ }
+
+ @Override
+ public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+ os.write(buffer.array(), buffer.rpos(), buffer.available());
+ os.flush();
+ DefaultIoWriteFuture f = new DefaultIoWriteFuture(this, null);
+ f.setValue(true);
+ return f;
+ }
+
+ protected static class DefaultIoWriteFuture extends AbstractIoWriteFuture {
+
+ public DefaultIoWriteFuture(Object id, Object lock) {
+ super(id, lock);
+ }
+
+ }
+
+}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java new \
file mode 100644 index 0000000..e2d7b94
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+/**
+ * A channel that can be either configured to use synchronous or asynchrounous \
streams. + *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface StreamingChannel {
+
+ enum Streaming {
+ Async,
+ Sync
+ }
+
+ Streaming getStreaming();
+
+ void setStreaming(Streaming streaming);
+
+}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java \
b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java index \
84d0468..2200ba2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -70,6 +70,9 @@ public class Nio2Session extends AbstractCloseable implements \
IoSession { private final AtomicLong lastReadCycleStart = new AtomicLong();
private final AtomicLong writeCyclesCounter = new AtomicLong();
private final AtomicLong lastWriteCycleStart = new AtomicLong();
+ private final Object suspendLock = new Object();
+ private volatile boolean suspend;
+ private volatile Runnable readRunnable;
public Nio2Session(
Nio2Service service, FactoryManager manager, IoHandler \
handler, AsynchronousSocketChannel socket, @@ -382,7 +385,43 @@ public class \
Nio2Session extends AbstractCloseable implements IoSession { exceptionCaught(exc);
}
+ @Override
+ public void suspendRead() {
+ log.trace("suspendRead({})", this);
+ boolean prev = suspend;
+ suspend = true;
+ if (!prev) {
+ log.debug("suspendRead({}) requesting read suspension", this);
+ }
+ }
+
+ @Override
+ public void resumeRead() {
+ log.trace("resumeRead({})", this);
+ if (suspend) {
+ Runnable runnable;
+ synchronized (suspendLock) {
+ suspend = false;
+ runnable = readRunnable;
+ }
+ if (runnable != null) {
+ log.debug("resumeRead({}) resuming read", this);
+ runnable.run();
+ }
+ }
+ }
+
protected void doReadCycle(ByteBuffer buffer, Nio2CompletionHandler<Integer, \
Object> completion) { + if (suspend) {
+ log.debug("doReadCycle({}) suspending reading", this);
+ synchronized (suspendLock) {
+ if (suspend) {
+ readRunnable = () -> doReadCycle(buffer, completion);
+ return;
+ }
+ }
+ }
+
AsynchronousSocketChannel socket = getSocket();
Duration readTimeout = \
CoreModuleProperties.NIO2_READ_TIMEOUT.getRequired(manager); \
readCyclesCounter.incrementAndGet();
diff --git a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java \
b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java index \
828bf72..d728c3e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
+++ b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
@@ -678,6 +678,23 @@ public final class CoreModuleProperties {
public static final Property<String> X11_BIND_HOST
= Property.string("x11-fwd-bind-host", \
SshdSocketAddress.LOCALHOST_IPV4);
+ /**
+ * Configuration value for the {@link \
org.apache.sshd.server.forward.TcpipServerChannel} to control the higher + * \
theshold for the data to be buffered waiting to be sent. If the buffered data size \
reaches this value, the + * session will pause reading until the data length goes \
below the + * {@link #TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW} threshold.
+ */
+ public static final Property<Long> \
TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH + = \
Property.long_("tcpip-server-channel-buffer-size-threshold-high", 1024 * 1024); +
+ /**
+ * The lower threshold. If not set, half the higher threshold will be used.
+ *
+ * @see #TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH
+ */
+ public static final Property<Long> \
TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW + = \
Property.long_("tcpip-server-channel-buffer-size-threshold-low"); +
private CoreModuleProperties() {
throw new UnsupportedOperationException("No instance");
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java \
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java \
index a64eaf3..e14775a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -23,6 +23,7 @@ import java.net.ConnectException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.sshd.client.future.DefaultOpenFuture;
import org.apache.sshd.client.future.OpenFuture;
@@ -34,17 +35,22 @@ import org.apache.sshd.common.channel.BufferedIoOutputStream;
import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
import org.apache.sshd.common.channel.ChannelFactory;
+import org.apache.sshd.common.channel.ChannelOutputStream;
+import org.apache.sshd.common.channel.SimpleIoOutputStream;
+import org.apache.sshd.common.channel.StreamingChannel;
import org.apache.sshd.common.channel.Window;
import org.apache.sshd.common.channel.exception.SshChannelOpenException;
import org.apache.sshd.common.forward.Forwarder;
import org.apache.sshd.common.forward.ForwardingTunnelEndpointsProvider;
import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoConnectFuture;
import org.apache.sshd.common.io.IoConnector;
import org.apache.sshd.common.io.IoHandler;
import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoServiceFactory;
import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.Readable;
@@ -56,6 +62,7 @@ import org.apache.sshd.common.util.net.SshdSocketAddress;
import org.apache.sshd.common.util.threads.CloseableExecutorService;
import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.server.channel.AbstractServerChannel;
import org.apache.sshd.server.forward.TcpForwardingFilter.Type;
@@ -64,7 +71,7 @@ import org.apache.sshd.server.forward.TcpForwardingFilter.Type;
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class TcpipServerChannel extends AbstractServerChannel implements \
ForwardingTunnelEndpointsProvider { +public class TcpipServerChannel extends \
AbstractServerChannel implements StreamingChannel, ForwardingTunnelEndpointsProvider \
{
public abstract static class TcpipFactory implements ChannelFactory, \
ExecutorServiceCarrier {
@@ -102,6 +109,8 @@ public class TcpipServerChannel extends AbstractServerChannel \
implements Forward private SshdSocketAddress tunnelExit;
private SshdSocketAddress originatorAddress;
private SocketAddress localAddress;
+ private final AtomicLong inFlightDataSize = new AtomicLong();
+ private Streaming streaming = Streaming.Sync;
public TcpipServerChannel(ForwardingFilter.Type type, CloseableExecutorService \
executor) { super("", Collections.emptyList(), executor);
@@ -121,6 +130,16 @@ public class TcpipServerChannel extends AbstractServerChannel \
implements Forward }
@Override
+ public Streaming getStreaming() {
+ return streaming;
+ }
+
+ @Override
+ public void setStreaming(Streaming streaming) {
+ this.streaming = streaming;
+ }
+
+ @Override
public SshdSocketAddress getTunnelEntrance() {
return tunnelEntrance;
}
@@ -195,19 +214,27 @@ public class TcpipServerChannel extends AbstractServerChannel \
implements Forward throw new RuntimeSshException(e);
}
- out = new BufferedIoOutputStream(
- "tcpip channel", new ChannelAsyncOutputStream(this, \
SshConstants.SSH_MSG_CHANNEL_DATA) {
- @SuppressWarnings("synthetic-access")
- @Override
- protected CloseFuture doCloseGracefully() {
- try {
- sendEof();
- } catch (IOException e) {
- session.exceptionCaught(e);
- }
- return super.doCloseGracefully();
+ if (streaming == Streaming.Async) {
+ out = new BufferedIoOutputStream(
+ "tcpip channel", new ChannelAsyncOutputStream(this, \
SshConstants.SSH_MSG_CHANNEL_DATA) { + \
@SuppressWarnings("synthetic-access") + @Override
+ protected CloseFuture doCloseGracefully() {
+ try {
+ sendEof();
+ } catch (IOException e) {
+ session.exceptionCaught(e);
}
- });
+ return super.doCloseGracefully();
+ }
+ });
+ } else {
+ this.out = new SimpleIoOutputStream(new ChannelOutputStream(
+ this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, \
true)); +
+ }
+ long thresholdHigh = \
CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH.getRequired(this);
+ long thresholdLow = \
CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW.get(this).orElse(thresholdHigh \
/ 2); IoHandler handler = new IoHandler() {
@Override
@SuppressWarnings("synthetic-access")
@@ -217,9 +244,23 @@ public class TcpipServerChannel extends AbstractServerChannel \
implements Forward
log.debug("doInit({}) Ignoring write to channel in CLOSING \
state", TcpipServerChannel.this); }
} else {
- Buffer buffer = new ByteArrayBuffer(message.available(), false);
+ int length = message.available();
+ Buffer buffer = new ByteArrayBuffer(length, false);
buffer.putBuffer(message);
- out.writePacket(buffer);
+ long total = inFlightDataSize.addAndGet(length);
+ if (total > thresholdHigh) {
+ session.suspendRead();
+ }
+ IoWriteFuture ioWriteFuture = out.writePacket(buffer);
+ ioWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() \
{ + @Override
+ public void operationComplete(IoWriteFuture future) {
+ long total = inFlightDataSize.addAndGet(-length);
+ if (total <= thresholdLow) {
+ session.resumeRead();
+ }
+ }
+ });
}
}
diff --git a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java \
b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java index 4948fe0..7afae32 100644
--- a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
@@ -127,8 +127,6 @@ public class LoadTest extends BaseTestSupport {
try (SshClient client = setupTestFullSupportClient()) {
CoreModuleProperties.MAX_PACKET_SIZE.set(client, 1024L * 16);
CoreModuleProperties.WINDOW_SIZE.set(client, 1024L * 8);
- client.setKeyExchangeFactories(Collections.singletonList(ClientBuilder.DH2KEX.apply(BuiltinDHFactories.dhg1)));
- client.setCipherFactories(Collections.singletonList(BuiltinCiphers.blowfishcbc));
client.start();
try (ClientSession session
= client.connect(getCurrentTestName(), TEST_LOCALHOST, \
port).verify(CONNECT_TIMEOUT).getSession()) {
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic