[prev in list] [next in list] [prev in thread] [next in thread] 

List:       mina-commits
Subject:    =?utf-8?q?=5Bmina-sshd=5D_branch_master_updated=3A_=5BSSHD-1070?= =?utf-8?q?=5D_Limit_the_amount_of_
From:       gnodet () apache ! org
Date:       2020-09-22 6:34:13
Message-ID: 160075645341.9907.16421717128410382465 () gitbox ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git


The following commit(s) were added to refs/heads/master by this push:
     new 72d6c00  [SSHD-1070] Limit the amount of data that is kept in memory for \
forwa… (#166) 72d6c00 is described below

commit 72d6c0086d2e86060e82e39b531338473f5195d0
Author: Guillaume Nodet <gnodet@gmail.com>
AuthorDate: Tue Sep 22 08:29:22 2020 +0200

    [SSHD-1070] Limit the amount of data that is kept in memory for forwa… (#166)
    
    * Add a switch to choose between sync / async modes for the TcpipServerChannel
    * Enable load tests
---
 .../java/org/apache/sshd/common/io/IoSession.java  | 19 ++++++
 sshd-core/pom.xml                                  |  3 -
 .../apache/sshd/client/channel/ClientChannel.java  | 11 +---
 .../sshd/common/channel/SimpleIoOutputStream.java  | 67 ++++++++++++++++++++
 .../sshd/common/channel/StreamingChannel.java      | 37 +++++++++++
 .../apache/sshd/common/io/nio2/Nio2Session.java    | 39 ++++++++++++
 .../org/apache/sshd/core/CoreModuleProperties.java | 17 ++++++
 .../sshd/server/forward/TcpipServerChannel.java    | 71 +++++++++++++++++-----
 .../src/test/java/org/apache/sshd/LoadTest.java    |  2 -
 9 files changed, 237 insertions(+), 29 deletions(-)

diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java \
b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java index \
                20dbb46..f8de2b4 100644
--- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java
+++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java
@@ -93,4 +93,23 @@ public interface IoSession extends ConnectionEndpointsIndicator, \
                PacketWriter, C
      * @throws IOException If failed to shutdown the stream
      */
     void shutdownOutputStream() throws IOException;
+
+    /**
+     * Suspend read operations on this session. May do nothing if not supported by \
the session implementation. +     *
+     * If the session usage includes a graceful shutdown with messages being \
exchanged, the caller needs to +     * take care of resuming reading the input in \
order to actually be able to carry on the conversation with +     * the peer.
+     */
+    default void suspendRead() {
+        // Do nothing by default, but can be overriden by implementations
+    }
+
+    /**
+     * Resume read operations on this session. May do nothing if not supported by \
the session implementation. +     */
+    default void resumeRead() {
+        // Do nothing by default, but can be overriden by implementations
+    }
+
 }
diff --git a/sshd-core/pom.xml b/sshd-core/pom.xml
index 48bffcd..8b4db65 100644
--- a/sshd-core/pom.xml
+++ b/sshd-core/pom.xml
@@ -184,9 +184,6 @@
                 <configuration>
                     <redirectTestOutputToFile>true</redirectTestOutputToFile>
                     \
                <reportsDirectory>${project.build.directory}/surefire-reports-nio2</reportsDirectory>
                
-                    <excludes>
-                        <exclude>**/*LoadTest.java</exclude>
-                    </excludes>
                 </configuration>
             </plugin>
         </plugins>
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java \
b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java index \
                6bd15dc..7897ba7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java
@@ -31,6 +31,7 @@ import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.client.session.ClientSessionHolder;
 import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.StreamingChannel;
 import org.apache.sshd.common.io.IoInputStream;
 import org.apache.sshd.common.io.IoOutputStream;
 
@@ -41,11 +42,7 @@ import org.apache.sshd.common.io.IoOutputStream;
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public interface ClientChannel extends Channel, ClientSessionHolder {
-    enum Streaming {
-        Async,
-        Sync
-    }
+public interface ClientChannel extends Channel, StreamingChannel, \
ClientSessionHolder {  
     @Override
     default ClientSession getClientSession() {
@@ -57,10 +54,6 @@ public interface ClientChannel extends Channel, \
                ClientSessionHolder {
      */
     String getChannelType();
 
-    Streaming getStreaming();
-
-    void setStreaming(Streaming streaming);
-
     IoOutputStream getAsyncIn();
 
     IoInputStream getAsyncOut();
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java \
new file mode 100644 index 0000000..6fee66a
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+import java.io.IOException;
+
+import org.apache.sshd.common.io.AbstractIoWriteFuture;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.closeable.AbstractCloseable;
+import org.apache.sshd.common.util.io.IoUtils;
+import org.apache.sshd.server.forward.TcpipServerChannel;
+
+/**
+ * An implementation of {@link IoOutputStream} using a synchronous {@link \
ChannelOutputStream}. + *
+ * @author     <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class SimpleIoOutputStream extends AbstractCloseable implements \
IoOutputStream { +
+    protected final ChannelOutputStream os;
+
+    public SimpleIoOutputStream(ChannelOutputStream os) {
+        this.os = os;
+    }
+
+    @Override
+    protected void doCloseImmediately() {
+        IoUtils.closeQuietly(os);
+        super.doCloseImmediately();
+    }
+
+    @Override
+    public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+        os.write(buffer.array(), buffer.rpos(), buffer.available());
+        os.flush();
+        DefaultIoWriteFuture f = new DefaultIoWriteFuture(this, null);
+        f.setValue(true);
+        return f;
+    }
+
+    protected static class DefaultIoWriteFuture extends AbstractIoWriteFuture {
+
+        public DefaultIoWriteFuture(Object id, Object lock) {
+            super(id, lock);
+        }
+
+    }
+
+}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java new \
file mode 100644 index 0000000..e2d7b94
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+/**
+ * A channel that can be either configured to use synchronous or asynchrounous \
streams. + *
+ * @author     <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface StreamingChannel {
+
+    enum Streaming {
+        Async,
+        Sync
+    }
+
+    Streaming getStreaming();
+
+    void setStreaming(Streaming streaming);
+
+}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java \
b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java index \
                84d0468..2200ba2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -70,6 +70,9 @@ public class Nio2Session extends AbstractCloseable implements \
IoSession {  private final AtomicLong lastReadCycleStart = new AtomicLong();
     private final AtomicLong writeCyclesCounter = new AtomicLong();
     private final AtomicLong lastWriteCycleStart = new AtomicLong();
+    private final Object suspendLock = new Object();
+    private volatile boolean suspend;
+    private volatile Runnable readRunnable;
 
     public Nio2Session(
                        Nio2Service service, FactoryManager manager, IoHandler \
handler, AsynchronousSocketChannel socket, @@ -382,7 +385,43 @@ public class \
Nio2Session extends AbstractCloseable implements IoSession {  exceptionCaught(exc);
     }
 
+    @Override
+    public void suspendRead() {
+        log.trace("suspendRead({})", this);
+        boolean prev = suspend;
+        suspend = true;
+        if (!prev) {
+            log.debug("suspendRead({}) requesting read suspension", this);
+        }
+    }
+
+    @Override
+    public void resumeRead() {
+        log.trace("resumeRead({})", this);
+        if (suspend) {
+            Runnable runnable;
+            synchronized (suspendLock) {
+                suspend = false;
+                runnable = readRunnable;
+            }
+            if (runnable != null) {
+                log.debug("resumeRead({}) resuming read", this);
+                runnable.run();
+            }
+        }
+    }
+
     protected void doReadCycle(ByteBuffer buffer, Nio2CompletionHandler<Integer, \
Object> completion) { +        if (suspend) {
+            log.debug("doReadCycle({}) suspending reading", this);
+            synchronized (suspendLock) {
+                if (suspend) {
+                    readRunnable = () -> doReadCycle(buffer, completion);
+                    return;
+                }
+            }
+        }
+
         AsynchronousSocketChannel socket = getSocket();
         Duration readTimeout = \
CoreModuleProperties.NIO2_READ_TIMEOUT.getRequired(manager);  \
                readCyclesCounter.incrementAndGet();
diff --git a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java \
b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java index \
                828bf72..d728c3e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
+++ b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
@@ -678,6 +678,23 @@ public final class CoreModuleProperties {
     public static final Property<String> X11_BIND_HOST
             = Property.string("x11-fwd-bind-host", \
SshdSocketAddress.LOCALHOST_IPV4);  
+    /**
+     * Configuration value for the {@link \
org.apache.sshd.server.forward.TcpipServerChannel} to control the higher +     * \
theshold for the data to be buffered waiting to be sent. If the buffered data size \
reaches this value, the +     * session will pause reading until the data length goes \
below the +     * {@link #TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW} threshold.
+     */
+    public static final Property<Long> \
TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH +            = \
Property.long_("tcpip-server-channel-buffer-size-threshold-high", 1024 * 1024); +
+    /**
+     * The lower threshold. If not set, half the higher threshold will be used.
+     * 
+     * @see #TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH
+     */
+    public static final Property<Long> \
TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW +            = \
Property.long_("tcpip-server-channel-buffer-size-threshold-low"); +
     private CoreModuleProperties() {
         throw new UnsupportedOperationException("No instance");
     }
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java \
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java \
                index a64eaf3..e14775a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -23,6 +23,7 @@ import java.net.ConnectException;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
@@ -34,17 +35,22 @@ import org.apache.sshd.common.channel.BufferedIoOutputStream;
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
 import org.apache.sshd.common.channel.ChannelFactory;
+import org.apache.sshd.common.channel.ChannelOutputStream;
+import org.apache.sshd.common.channel.SimpleIoOutputStream;
+import org.apache.sshd.common.channel.StreamingChannel;
 import org.apache.sshd.common.channel.Window;
 import org.apache.sshd.common.channel.exception.SshChannelOpenException;
 import org.apache.sshd.common.forward.Forwarder;
 import org.apache.sshd.common.forward.ForwardingTunnelEndpointsProvider;
 import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoConnectFuture;
 import org.apache.sshd.common.io.IoConnector;
 import org.apache.sshd.common.io.IoHandler;
 import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoServiceFactory;
 import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.Readable;
@@ -56,6 +62,7 @@ import org.apache.sshd.common.util.net.SshdSocketAddress;
 import org.apache.sshd.common.util.threads.CloseableExecutorService;
 import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
 import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.apache.sshd.core.CoreModuleProperties;
 import org.apache.sshd.server.channel.AbstractServerChannel;
 import org.apache.sshd.server.forward.TcpForwardingFilter.Type;
 
@@ -64,7 +71,7 @@ import org.apache.sshd.server.forward.TcpForwardingFilter.Type;
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class TcpipServerChannel extends AbstractServerChannel implements \
ForwardingTunnelEndpointsProvider { +public class TcpipServerChannel extends \
AbstractServerChannel implements StreamingChannel, ForwardingTunnelEndpointsProvider \
{  
     public abstract static class TcpipFactory implements ChannelFactory, \
ExecutorServiceCarrier {  
@@ -102,6 +109,8 @@ public class TcpipServerChannel extends AbstractServerChannel \
implements Forward  private SshdSocketAddress tunnelExit;
     private SshdSocketAddress originatorAddress;
     private SocketAddress localAddress;
+    private final AtomicLong inFlightDataSize = new AtomicLong();
+    private Streaming streaming = Streaming.Sync;
 
     public TcpipServerChannel(ForwardingFilter.Type type, CloseableExecutorService \
executor) {  super("", Collections.emptyList(), executor);
@@ -121,6 +130,16 @@ public class TcpipServerChannel extends AbstractServerChannel \
implements Forward  }
 
     @Override
+    public Streaming getStreaming() {
+        return streaming;
+    }
+
+    @Override
+    public void setStreaming(Streaming streaming) {
+        this.streaming = streaming;
+    }
+
+    @Override
     public SshdSocketAddress getTunnelEntrance() {
         return tunnelEntrance;
     }
@@ -195,19 +214,27 @@ public class TcpipServerChannel extends AbstractServerChannel \
implements Forward  throw new RuntimeSshException(e);
         }
 
-        out = new BufferedIoOutputStream(
-                "tcpip channel", new ChannelAsyncOutputStream(this, \
                SshConstants.SSH_MSG_CHANNEL_DATA) {
-                    @SuppressWarnings("synthetic-access")
-                    @Override
-                    protected CloseFuture doCloseGracefully() {
-                        try {
-                            sendEof();
-                        } catch (IOException e) {
-                            session.exceptionCaught(e);
-                        }
-                        return super.doCloseGracefully();
+        if (streaming == Streaming.Async) {
+            out = new BufferedIoOutputStream(
+                    "tcpip channel", new ChannelAsyncOutputStream(this, \
SshConstants.SSH_MSG_CHANNEL_DATA) { +                \
@SuppressWarnings("synthetic-access") +                @Override
+                protected CloseFuture doCloseGracefully() {
+                    try {
+                        sendEof();
+                    } catch (IOException e) {
+                        session.exceptionCaught(e);
                     }
-                });
+                    return super.doCloseGracefully();
+                }
+            });
+        } else {
+            this.out = new SimpleIoOutputStream(new ChannelOutputStream(
+                    this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, \
true)); +
+        }
+        long thresholdHigh = \
CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH.getRequired(this);
 +        long thresholdLow = \
CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW.get(this).orElse(thresholdHigh \
/ 2);  IoHandler handler = new IoHandler() {
             @Override
             @SuppressWarnings("synthetic-access")
@@ -217,9 +244,23 @@ public class TcpipServerChannel extends AbstractServerChannel \
                implements Forward
                         log.debug("doInit({}) Ignoring write to channel in CLOSING \
state", TcpipServerChannel.this);  }
                 } else {
-                    Buffer buffer = new ByteArrayBuffer(message.available(), false);
+                    int length = message.available();
+                    Buffer buffer = new ByteArrayBuffer(length, false);
                     buffer.putBuffer(message);
-                    out.writePacket(buffer);
+                    long total = inFlightDataSize.addAndGet(length);
+                    if (total > thresholdHigh) {
+                        session.suspendRead();
+                    }
+                    IoWriteFuture ioWriteFuture = out.writePacket(buffer);
+                    ioWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() \
{ +                        @Override
+                        public void operationComplete(IoWriteFuture future) {
+                            long total = inFlightDataSize.addAndGet(-length);
+                            if (total <= thresholdLow) {
+                                session.resumeRead();
+                            }
+                        }
+                    });
                 }
             }
 
diff --git a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java \
b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java index 4948fe0..7afae32 100644
--- a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
@@ -127,8 +127,6 @@ public class LoadTest extends BaseTestSupport {
         try (SshClient client = setupTestFullSupportClient()) {
             CoreModuleProperties.MAX_PACKET_SIZE.set(client, 1024L * 16);
             CoreModuleProperties.WINDOW_SIZE.set(client, 1024L * 8);
-            client.setKeyExchangeFactories(Collections.singletonList(ClientBuilder.DH2KEX.apply(BuiltinDHFactories.dhg1)));
                
-            client.setCipherFactories(Collections.singletonList(BuiltinCiphers.blowfishcbc));
  client.start();
             try (ClientSession session
                     = client.connect(getCurrentTestName(), TEST_LOCALHOST, \
port).verify(CONNECT_TIMEOUT).getSession()) {


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic