[prev in list] [next in list] [prev in thread] [next in thread]
List: mina-commits
Subject: [mina-sshd] branch master updated: [SSHD-1080] Rework the PacketWriter to split according to the var
From: gnodet () apache ! org
Date: 2020-09-22 9:39:02
Message-ID: 160076754219.12208.5520800603262541640 () gitbox ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push:
new 8e59075 [SSHD-1080] Rework the PacketWriter to split according to the \
various semantics 8e59075 is described below
commit 8e59075bc55c497e4d4056f457e4d657d3c61c1a
Author: Guillaume Nodet <gnodet@gmail.com>
AuthorDate: Tue Sep 22 11:38:55 2020 +0200
[SSHD-1080] Rework the PacketWriter to split according to the various semantics
---
CHANGES.md | 1 +
.../org/apache/sshd/common/io/IoInputStream.java | 5 ++
.../org/apache/sshd/common/io/IoOutputStream.java | 22 +++++-
.../java/org/apache/sshd/common/io/IoSession.java | 30 ++++++--
.../org/apache/sshd/common/io/PacketWriter.java | 81 ----------------------
...ter.java => ThrottlingChannelStreamWriter.java} | 35 +++++-----
...java => ThrottlingChannelStreamWriterTest.java} | 37 +++++-----
.../java/org/apache/sshd/common/BaseBuilder.java | 8 +--
.../org/apache/sshd/common/FactoryManager.java | 4 +-
.../sshd/common/channel/AbstractChannel.java | 18 ++---
.../common/channel/BufferedIoOutputStream.java | 6 +-
.../org/apache/sshd/common/channel/Channel.java | 19 +++--
.../common/channel/ChannelAsyncOutputStream.java | 12 ++--
.../sshd/common/channel/ChannelOutputStream.java | 8 +--
.../sshd/common/channel/SimpleIoOutputStream.java | 5 +-
.../sshd/common/channel/StreamingChannel.java | 2 +-
.../channel/throttle/ChannelStreamWriter.java | 48 +++++++++++++
...olver.java => ChannelStreamWriterResolver.java} | 12 ++--
...ava => ChannelStreamWriterResolverManager.java} | 19 +++--
...anager.java => DefaultChannelStreamWriter.java} | 36 +++++++---
.../org/apache/sshd/common/forward/SocksProxy.java | 8 +--
.../sshd/common/forward/TcpipClientChannel.java | 2 +-
.../common/helpers/AbstractFactoryManager.java | 12 ++--
.../apache/sshd/common/io/nio2/Nio2Session.java | 4 +-
.../org/apache/sshd/common/session/Session.java | 34 +++++----
.../common/session/helpers/AbstractSession.java | 49 +++++++++++--
.../sshd/common/session/helpers/SessionHelper.java | 20 +++---
.../sshd/server/forward/TcpipServerChannel.java | 34 ++++-----
.../sshd/server/session/AbstractServerSession.java | 4 +-
.../sshd/server/x11/ChannelForwardedX11.java | 2 +-
.../src/test/java/org/apache/sshd/LoadTest.java | 4 --
.../java/org/apache/sshd/WindowAdjustTest.java | 2 +-
.../java/org/apache/sshd/client/ClientTest.java | 9 +--
.../org/apache/sshd/common/channel/WindowTest.java | 5 +-
.../common/forward/PortForwardingLoadTest.java | 5 +-
.../session/helpers/AbstractSessionTest.java | 2 +-
.../sshd/server/ServerProxyAcceptorTest.java | 2 +-
.../sshd/util/test/AsyncEchoShellFactory.java | 2 +-
.../org/apache/sshd/util/test/BogusChannel.java | 14 ++--
.../java/org/apache/sshd/mina/MinaSession.java | 2 +-
.../java/org/apache/sshd/netty/NettyIoSession.java | 2 +-
.../sshd/sftp/client/impl/DefaultSftpClient.java | 4 +-
.../org/apache/sshd/sftp/server/SftpSubsystem.java | 2 +-
43 files changed, 359 insertions(+), 273 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index fc2776d..23f04ef 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -17,6 +17,7 @@ or `-key-file` command line option.
* [SSHD-1034](https://issues.apache.org/jira/browse/SSHD-1034) Rename \
`org.apache.sshd.common.ForwardingFilter` to `Forwarder`.
* [SSHD-1035](https://issues.apache.org/jira/browse/SSHD-1035) Move property \
definitions to common locations.
* [SSHD-1038](https://issues.apache.org/jira/browse/SSHD-1038) Refactor packages \
from a module into a cleaner hierarchy. +* \
[SSHD-1080](https://issues.apache.org/jira/browse/SSHD-1080) Rework the PacketWriter \
to split according to the various semantics
* [SSHD-1084](https://issues.apache.org/jira/browse/SSHD-1084) Revert the usage of \
asynchronous streams when forwarding ports.
## Minor code helpers
diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoInputStream.java \
b/sshd-common/src/main/java/org/apache/sshd/common/io/IoInputStream.java index \
faa509c..8fb86e3 100644
--- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoInputStream.java
+++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoInputStream.java
@@ -21,6 +21,11 @@ package org.apache.sshd.common.io;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.util.buffer.Buffer;
+/**
+ * Represents a stream that can be read asynchronously.
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
public interface IoInputStream extends Closeable {
/**
* NOTE: the buffer must not be touched until the returned read future is \
completed.
diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoOutputStream.java \
b/sshd-common/src/main/java/org/apache/sshd/common/io/IoOutputStream.java index \
e98e5f0..64b8876 100644
--- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
+++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
@@ -18,8 +18,26 @@
*/
package org.apache.sshd.common.io;
+import java.io.IOException;
+
import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.util.buffer.Buffer;
+
+/**
+ * Represents a stream that can be written asynchronously.
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface IoOutputStream extends Closeable {
+
+ /**
+ * Write the given buffer.
+ *
+ * @param buffer the data to write. <B>NOTE:</B> the buffer must not be \
touched until the returned write + * future is completed.
+ * @return An {@code IoWriteFuture} that can be used to check when \
the data has actually been written. + * @throws IOException if an error occurred \
when writing the data + */
+ IoWriteFuture writeBuffer(Buffer buffer) throws IOException;
-public interface IoOutputStream extends Closeable, PacketWriter {
- // nothing extra
}
diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java \
b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java index \
f8de2b4..76b0022 100644
--- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java
+++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java
@@ -22,9 +22,11 @@ import java.io.IOException;
import java.net.SocketAddress;
import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.net.ConnectionEndpointsIndicator;
-public interface IoSession extends ConnectionEndpointsIndicator, PacketWriter, \
Closeable { +public interface IoSession extends ConnectionEndpointsIndicator, \
Closeable {
/**
* @return a unique identifier for this session. Every session has its own ID \
which is different from any other. @@ -83,6 +85,27 @@ public interface IoSession \
extends ConnectionEndpointsIndicator, PacketWriter, C Object removeAttribute(Object \
key);
/**
+ * Write a packet on the socket. Multiple writes can be issued concurrently and \
will be queued. + *
+ * @param buffer the buffer send. <B>NOTE:</B> the buffer must not be \
touched until the returned write future + * is completed.
+ * @return An {@code IoWriteFuture} that can be used to check when \
the packet has actually been sent + * @throws IOException if an error occurred \
when sending the packet + */
+ IoWriteFuture writeBuffer(Buffer buffer) throws IOException;
+
+ /**
+ * Closes this session immediately or after all queued write requests are \
flushed. This operation is asynchronous. + * Wait for the returned {@link \
CloseFuture} if you want to wait for the session actually closed. + *
+ * @param immediately {@code true} to close this session immediately. The \
pending write requests will simply be + * discarded. {@code \
false} to close this session after all queued write requests are flushed. + * \
@return The generated {@link CloseFuture} + */
+ @Override
+ CloseFuture close(boolean immediately);
+
+ /**
* @return the {@link IoService} that created this session.
*/
IoService getService();
@@ -97,9 +120,8 @@ public interface IoSession extends ConnectionEndpointsIndicator, \
PacketWriter, C /**
* Suspend read operations on this session. May do nothing if not supported by \
the session implementation.
*
- * If the session usage includes a graceful shutdown with messages being \
exchanged, the caller needs to
- * take care of resuming reading the input in order to actually be able to carry \
on the conversation with
- * the peer.
+ * If the session usage includes a graceful shutdown with messages being \
exchanged, the caller needs to take care of + * resuming reading the input in \
order to actually be able to carry on the conversation with the peer.
*/
default void suspendRead() {
// Do nothing by default, but can be overriden by implementations
diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/PacketWriter.java \
b/sshd-common/src/main/java/org/apache/sshd/common/io/PacketWriter.java deleted file \
mode 100644 index 0862728..0000000
--- a/sshd-common/src/main/java/org/apache/sshd/common/io/PacketWriter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd.common.io;
-
-import java.io.IOException;
-import java.nio.channels.Channel;
-
-import org.apache.sshd.common.util.buffer.Buffer;
-
-/**
- * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
- */
-public interface PacketWriter extends Channel {
- /**
- * Encode and send the given buffer. <B>Note:</B> for session packets the buffer \
has to have 5 bytes free at the
- * beginning to allow the encoding to take place. Also, the write position of \
the buffer has to be set to the
- * position of the last byte to write.
- *
- * @param buffer the buffer to encode and send. <B>NOTE:</B> the buffer \
must not be touched until the returned
- * write future is completed.
- * @return An {@code IoWriteFuture} that can be used to check when \
the packet has actually been sent
- * @throws IOException if an error occurred when encoding sending the packet
- */
- IoWriteFuture writePacket(Buffer buffer) throws IOException;
-
- /**
- * @param len The packet payload size
- * @param blockSize The cipher block size
- * @param etmMode Whether using "encrypt-then-MAC" mode
- * @return The required padding length
- */
- static int calculatePadLength(int len, int blockSize, boolean etmMode) {
- /*
- * Note: according to RFC-4253 section 6:
- *
- * The minimum size of a packet is 16 (or the cipher block size, whichever \
is larger) bytes (plus 'mac').
- *
- * Since all out ciphers, MAC(s), etc. have a block size > 8 then the \
minimum size of the packet will be at
- * least 16 due to the padding at the very least - so even packets that \
contain an opcode with no arguments will
- * be above this value. This avoids an un-necessary call to Math.max(len, \
16) for each and every packet
- */
-
- len++; // the pad length
- if (!etmMode) {
- len += Integer.BYTES;
- }
-
- /*
- * Note: according to RFC-4253 section 6:
- *
- * Note that the length of the concatenation of 'packet_length', \
'padding_length', 'payload', and 'random
- * padding' MUST be a multiple of the cipher block size or 8, whichever is \
larger.
- *
- * However, we currently do not have ciphers with a block size of less than \
8 so we do not take this into
- * account in order to accelerate the calculation and avoiding an \
un-necessary call to Math.max(blockSize, 8)
- * for each and every packet.
- */
- int pad = (-len) & (blockSize - 1);
- if (pad < blockSize) {
- pad += blockSize;
- }
-
- return pad;
- }
-}
diff --git a/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java \
b/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriter.java
similarity index 83%
rename from sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java
rename to sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriter.java
index a62e25c..5092b5e 100644
--- a/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriter.java
+++ b/sshd-contrib/src/main/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriter.java
@@ -34,22 +34,22 @@ import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.PacketWriter;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
/**
- * A {@link PacketWriter} delegate implementation that "throttles" the \
output by having a limit on the
- * outstanding packets that have not been sent yet. The {@link #writePacket(Buffer) \
writePacket} implementation make
- * sure that the limit has not been exceeded - if so, then it waits until pending \
packets have been successfully sent
- * before sending the next packet.
+ * A {@link ChannelStreamWriter} delegate implementation that "throttles" \
the output by having a limit on the + * outstanding packets that have not been sent \
yet. The {@link #writeData(Buffer) writePacket} implementation make sure + * that the \
limit has not been exceeded - if so, then it waits until pending packets have been \
successfully sent before + * sending the next packet.
*
* <B>Note:</B> {@link #close() closing} the throttler does not close the delegate \
writer
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class ThrottlingPacketWriter extends AbstractLoggingBean implements \
PacketWriter, SshFutureListener<IoWriteFuture> { +public class \
ThrottlingChannelStreamWriter extends AbstractLoggingBean + implements \
ChannelStreamWriter, SshFutureListener<IoWriteFuture> {
/** Timeout (seconds) for throttling packet writer to wait for pending packets \
send */ public static final Property<Duration> WAIT_TIME
= Property.durationSec("packet-writer-wait-time", \
Duration.ofSeconds(30L)); @@ -58,29 +58,30 @@ public class ThrottlingPacketWriter \
extends AbstractLoggingBean implements Packe public static final Property<Integer> \
MAX_PEND_COUNT = Property.integer("packet-writer-max-pend-count", 4096);
- private final PacketWriter delegate;
+ private final ChannelStreamWriter delegate;
private final int maxPendingPackets;
private final long maxWait;
private final AtomicBoolean open = new AtomicBoolean(true);
private final AtomicInteger availableCount;
- public ThrottlingPacketWriter(Channel channel) {
- this(channel, channel);
+ public ThrottlingChannelStreamWriter(Channel channel) {
+ this(new DefaultChannelStreamWriter(channel), channel);
}
- public ThrottlingPacketWriter(PacketWriter delegate, PropertyResolver resolver) \
{ + public ThrottlingChannelStreamWriter(ChannelStreamWriter delegate, \
PropertyResolver resolver) {
this(delegate, MAX_PEND_COUNT.getRequired(resolver), \
WAIT_TIME.getRequired(resolver)); }
- public ThrottlingPacketWriter(PacketWriter delegate, int maxPendingPackets, \
TimeUnit waitUnit, long waitCount) { + public \
ThrottlingChannelStreamWriter(ChannelStreamWriter delegate, int maxPendingPackets, \
TimeUnit waitUnit, + long waitCount) {
this(delegate, maxPendingPackets, waitUnit.toMillis(waitCount));
}
- public ThrottlingPacketWriter(PacketWriter delegate, int maxPendingPackets, \
Duration maxWait) { + public ThrottlingChannelStreamWriter(ChannelStreamWriter \
delegate, int maxPendingPackets, Duration maxWait) { this(delegate, \
maxPendingPackets, maxWait.toMillis()); }
- public ThrottlingPacketWriter(PacketWriter delegate, int maxPendingPackets, long \
maxWait) { + public ThrottlingChannelStreamWriter(ChannelStreamWriter delegate, \
int maxPendingPackets, long maxWait) {
this.delegate = Objects.requireNonNull(delegate, "No delegate provided");
ValidateUtils.checkTrue(maxPendingPackets > 0, "Invalid pending packets \
limit: %d", maxPendingPackets); this.maxPendingPackets = maxPendingPackets;
@@ -89,7 +90,7 @@ public class ThrottlingPacketWriter extends AbstractLoggingBean \
implements Packe this.maxWait = maxWait;
}
- public PacketWriter getDelegate() {
+ public ChannelStreamWriter getDelegate() {
return delegate;
}
@@ -111,7 +112,7 @@ public class ThrottlingPacketWriter extends AbstractLoggingBean \
implements Packe }
@Override
- public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+ public IoWriteFuture writeData(Buffer buffer) throws IOException {
if (!isOpen()) {
throw new ClosedSelectorException();
}
@@ -147,8 +148,8 @@ public class ThrottlingPacketWriter extends AbstractLoggingBean \
implements Packe
throw new EOFException("Negative available packets count: " + \
available); }
- PacketWriter writer = getDelegate();
- return writer.writePacket(buffer).addListener(this);
+ ChannelStreamWriter writer = getDelegate();
+ return writer.writeData(buffer).addListener(this);
}
@Override
diff --git a/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java \
b/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriterTest.java
similarity index 76%
rename from sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java
rename to sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriterTest.java
index 7bed6dc..14eb5ca 100644
--- a/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingPacketWriterTest.java
+++ b/sshd-contrib/src/test/java/org/apache/sshd/common/channel/throttle/ThrottlingChannelStreamWriterTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.sshd.common.channel.IoWriteFutureImpl;
import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.PacketWriter;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.util.test.BaseTestSupport;
@@ -45,27 +44,27 @@ import org.junit.runners.MethodSorters;
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@Category({ NoIoTestCase.class })
-public class ThrottlingPacketWriterTest extends BaseTestSupport {
- public ThrottlingPacketWriterTest() {
+public class ThrottlingChannelStreamWriterTest extends BaseTestSupport {
+ public ThrottlingChannelStreamWriterTest() {
super();
}
@Test(timeout = 10_000)
public void testThrottlerWaitsUntilPacketSendSignalled() throws IOException {
- try (ThrottlingPacketWriter throttler
- = new ThrottlingPacketWriter(new MockPacketWriter(), Byte.SIZE, \
TimeUnit.SECONDS.toMillis(3L))) { + try (ThrottlingChannelStreamWriter \
throttler + = new ThrottlingChannelStreamWriter(new \
MockChannelStreamWriter(), Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) { int maxSize \
= throttler.getMaxPendingPackets(); List<IoWriteFuture> pendingWrites = new \
ArrayList<>(maxSize); Buffer buf = new ByteArrayBuffer(Byte.SIZE);
for (int index = maxSize; index > 0; index--) {
- IoWriteFuture future = throttler.writePacket(buf);
+ IoWriteFuture future = throttler.writeData(buf);
pendingWrites.add(future);
assertEquals("Mismatched available packets count", index - 1, \
throttler.getAvailablePacketsCount()); }
assertEquals("Not all available packet window size exhausted", 0, \
throttler.getAvailablePacketsCount()); try {
- IoWriteFuture future = throttler.writePacket(buf);
+ IoWriteFuture future = throttler.writeData(buf);
fail("Unexpected extra packet success: " + future);
} catch (InterruptedByTimeoutException e) {
// expected
@@ -79,41 +78,41 @@ public class ThrottlingPacketWriterTest extends BaseTestSupport {
}
for (int index = throttler.getAvailablePacketsCount(); index < maxSize; \
index++) {
- throttler.writePacket(buf);
+ throttler.writeData(buf);
}
}
}
@Test(expected = ClosedSelectorException.class, timeout = 10_000)
public void testThrottlerDoesNotSendIfClosed() throws IOException {
- try (PacketWriter throttler
- = new ThrottlingPacketWriter(new MockPacketWriter(), Byte.SIZE, \
TimeUnit.SECONDS.toMillis(3L))) { + try (ChannelStreamWriter throttler
+ = new ThrottlingChannelStreamWriter(new MockChannelStreamWriter(), \
Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) {
assertTrue("Throttler not marked as open", throttler.isOpen());
throttler.close();
assertFalse("Throttler not marked as closed", throttler.isOpen());
- IoWriteFuture future = throttler.writePacket(new \
ByteArrayBuffer(Byte.SIZE)); + IoWriteFuture future = \
throttler.writeData(new ByteArrayBuffer(Byte.SIZE)); fail("Unexpected success: " + \
future); }
}
@Test(expected = ClosedSelectorException.class, timeout = 10_000)
public void testThrottlerStopsSendingIfExceptionSignaledOnFutureOperationCompletion() \
throws IOException {
- try (PacketWriter throttler
- = new ThrottlingPacketWriter(new MockPacketWriter(), Byte.SIZE, \
TimeUnit.SECONDS.toMillis(3L))) { + try (ChannelStreamWriter throttler
+ = new ThrottlingChannelStreamWriter(new MockChannelStreamWriter(), \
Byte.SIZE, TimeUnit.SECONDS.toMillis(3L))) {
assertTrue("Throttler not marked as open", throttler.isOpen());
- IoWriteFutureImpl futureImpl = (IoWriteFutureImpl) \
throttler.writePacket(new ByteArrayBuffer(Byte.SIZE)); + IoWriteFutureImpl \
futureImpl = (IoWriteFutureImpl) throttler.writeData(new \
ByteArrayBuffer(Byte.SIZE));
futureImpl.setValue(new StreamCorruptedException(getCurrentTestName()));
assertFalse("Throttler not marked as closed", throttler.isOpen());
- IoWriteFuture future = throttler.writePacket(new \
ByteArrayBuffer(Byte.SIZE)); + IoWriteFuture future = \
throttler.writeData(new ByteArrayBuffer(Byte.SIZE)); fail("Unexpected success: " + \
future); }
}
- private static class MockPacketWriter implements PacketWriter {
- MockPacketWriter() {
+ private static class MockChannelStreamWriter implements ChannelStreamWriter {
+ MockChannelStreamWriter() {
super();
}
@@ -128,8 +127,8 @@ public class ThrottlingPacketWriterTest extends BaseTestSupport {
}
@Override
- public IoWriteFuture writePacket(Buffer buffer) throws IOException {
- return new IoWriteFutureImpl(MockPacketWriter.class.getSimpleName(), \
buffer); + public IoWriteFuture writeData(Buffer buffer) throws IOException {
+ return new \
IoWriteFutureImpl(MockChannelStreamWriter.class.getSimpleName(), buffer); }
}
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java \
b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java index \
6b7aa97..6c24a5c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/BaseBuilder.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.sshd.common.channel.ChannelFactory;
import org.apache.sshd.common.channel.RequestHandler;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver;
import org.apache.sshd.common.cipher.BuiltinCiphers;
import org.apache.sshd.common.cipher.Cipher;
import org.apache.sshd.common.compression.Compression;
@@ -148,7 +148,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S \
extends BaseBuilder protected ForwarderFactory forwarderFactory;
protected List<RequestHandler<ConnectionService>> globalRequestHandlers;
protected ForwardingFilter forwardingFilter;
- protected ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver;
+ protected ChannelStreamWriterResolver channelStreamPacketWriterResolver;
protected UnknownChannelReferenceHandler unknownChannelReferenceHandler;
public BaseBuilder() {
@@ -247,7 +247,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S \
extends BaseBuilder return me();
}
- public S channelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver \
resolver) { + public S \
channelStreamPacketWriterResolver(ChannelStreamWriterResolver resolver) { \
channelStreamPacketWriterResolver = resolver; return me();
}
@@ -275,7 +275,7 @@ public class BaseBuilder<T extends AbstractFactoryManager, S \
extends BaseBuilder ssh.setForwardingFilter(forwardingFilter);
ssh.setForwarderFactory(forwarderFactory);
ssh.setGlobalRequestHandlers(globalRequestHandlers);
- ssh.setChannelStreamPacketWriterResolver(channelStreamPacketWriterResolver);
+ ssh.setChannelStreamWriterResolver(channelStreamPacketWriterResolver);
ssh.setUnknownChannelReferenceHandler(unknownChannelReferenceHandler);
return ssh;
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java \
b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java index \
47340c9..52df0a9 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -26,7 +26,7 @@ import org.apache.sshd.agent.SshAgentFactory;
import org.apache.sshd.common.channel.ChannelFactory;
import org.apache.sshd.common.channel.ChannelListenerManager;
import org.apache.sshd.common.channel.RequestHandler;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager;
import org.apache.sshd.common.file.FileSystemFactory;
import org.apache.sshd.common.forward.ForwarderFactory;
import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
@@ -56,7 +56,7 @@ public interface FactoryManager
ReservedSessionMessagesManager,
SessionDisconnectHandlerManager,
ChannelListenerManager,
- ChannelStreamPacketWriterResolverManager,
+ ChannelStreamWriterResolverManager,
UnknownChannelReferenceHandlerManager,
PortForwardingEventListenerManager,
IoServiceEventListenerManager,
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index \
1ce2566..8bb4d99 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -41,8 +41,8 @@ import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.SshConstants;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.DefaultCloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
@@ -105,7 +105,7 @@ public abstract class AbstractChannel
private final Window localWindow;
private final Window remoteWindow;
- private ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver;
+ private ChannelStreamWriterResolver channelStreamPacketWriterResolver;
/**
* A {@link Map} of sent requests - key = request name, value = timestamp when \
request was sent. @@ -196,24 +196,24 @@ public abstract class AbstractChannel
}
@Override
- public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() \
{ + public ChannelStreamWriterResolver getChannelStreamWriterResolver() {
return channelStreamPacketWriterResolver;
}
@Override
- public void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver \
resolver) { + public void \
setChannelStreamWriterResolver(ChannelStreamWriterResolver resolver) { \
channelStreamPacketWriterResolver = resolver; }
@Override
- public ChannelStreamPacketWriterResolver \
resolveChannelStreamPacketWriterResolver() {
- ChannelStreamPacketWriterResolver resolver = \
getChannelStreamPacketWriterResolver(); + public ChannelStreamWriterResolver \
resolveChannelStreamWriterResolver() { + ChannelStreamWriterResolver resolver \
= getChannelStreamWriterResolver(); if (resolver != null) {
return resolver;
}
- ChannelStreamPacketWriterResolverManager manager = getSession();
- return manager.resolveChannelStreamPacketWriterResolver();
+ ChannelStreamWriterResolverManager manager = getSession();
+ return manager.resolveChannelStreamWriterResolver();
}
/**
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java \
index f1f64fd..3ee3ece 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
@@ -32,7 +32,7 @@ import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.closeable.AbstractInnerCloseable;
/**
- * An {@link IoOutputStream} capable of queuing write requests
+ * An {@link IoOutputStream} capable of queuing write requests.
*/
public class BufferedIoOutputStream extends AbstractInnerCloseable implements \
IoOutputStream { protected final IoOutputStream out;
@@ -50,7 +50,7 @@ public class BufferedIoOutputStream extends AbstractInnerCloseable \
implements Io }
@Override
- public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+ public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
if (isClosing()) {
throw new EOFException("Closed - state=" + state);
}
@@ -71,7 +71,7 @@ public class BufferedIoOutputStream extends AbstractInnerCloseable \
implements Io return;
}
- out.writePacket(future.getBuffer()).addListener(
+ out.writeBuffer(future.getBuffer()).addListener(
new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(IoWriteFuture f) {
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java index \
4374e88..dae4aa1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
@@ -28,8 +28,8 @@ import org.apache.sshd.common.AttributeRepository;
import org.apache.sshd.common.AttributeStore;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.PropertyResolver;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
-import org.apache.sshd.common.io.PacketWriter;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager;
+import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.SessionHolder;
@@ -47,8 +47,7 @@ public interface Channel
ChannelListenerManager,
PropertyResolver,
AttributeStore,
- PacketWriter,
- ChannelStreamPacketWriterResolverManager,
+ ChannelStreamWriterResolverManager,
Closeable {
// Known types of channels
String CHANNEL_EXEC = "exec";
@@ -224,4 +223,16 @@ public interface Channel
T value = channel.getAttribute(key);
return (value != null) ? value : \
Session.resolveAttribute(channel.getSession(), key); }
+
+ /**
+ * Encode and send the given buffer. <B>Note:</B> for session packets the buffer \
has to have 5 bytes free at the + * beginning to allow the encoding to take \
place. Also, the write position of the buffer has to be set to the + * position \
of the last byte to write. + *
+ * @param buffer the buffer to encode and send. <B>NOTE:</B> the buffer \
must not be touched until the returned + * write future is \
completed. + * @return An {@code IoWriteFuture} that can be used to \
check when the packet has actually been sent + * @throws IOException if an error \
occurred when encoding or sending the packet + */
+ IoWriteFuture writePacket(Buffer buffer) throws IOException;
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index 8b69e69..8d1701f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -24,10 +24,10 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriter;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.PacketWriter;
import org.apache.sshd.common.io.WritePendingException;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.buffer.Buffer;
@@ -36,14 +36,14 @@ import org.apache.sshd.common.util.closeable.AbstractCloseable;
public class ChannelAsyncOutputStream extends AbstractCloseable implements \
IoOutputStream, ChannelHolder { private final Channel channelInstance;
- private final PacketWriter packetWriter;
+ private final ChannelStreamWriter packetWriter;
private final byte cmd;
private final AtomicReference<IoWriteFutureImpl> pendingWrite = new \
AtomicReference<>(); private final Object packetWriteId;
public ChannelAsyncOutputStream(Channel channel, byte cmd) {
this.channelInstance = Objects.requireNonNull(channel, "No channel");
- this.packetWriter = \
channelInstance.resolveChannelStreamPacketWriter(channel, cmd); + \
this.packetWriter = channelInstance.resolveChannelStreamWriter(channel, cmd); \
this.cmd = cmd;
this.packetWriteId = channel.toString() + "[" + \
SshConstants.getCommandMessageName(cmd) + "]"; }
@@ -58,14 +58,14 @@ public class ChannelAsyncOutputStream extends AbstractCloseable \
implements IoOut }
@Override
- public synchronized IoWriteFuture writePacket(Buffer buffer) throws IOException \
{ + public synchronized IoWriteFuture writeBuffer(Buffer buffer) throws \
IOException { if (isClosing()) {
throw new EOFException("Closing: " + state);
}
IoWriteFutureImpl future = new IoWriteFutureImpl(packetWriteId, buffer);
if (!pendingWrite.compareAndSet(null, future)) {
- throw new WritePendingException("No write pending future");
+ throw new WritePendingException("A write operation is already pending");
}
doWriteIfPossible(false);
return future;
@@ -164,7 +164,7 @@ public class ChannelAsyncOutputStream extends AbstractCloseable \
implements IoOut remoteWindow.consume(length);
try {
- IoWriteFuture writeFuture = packetWriter.writePacket(buf);
+ IoWriteFuture writeFuture = packetWriter.writeData(buf);
writeFuture.addListener(f -> onWritten(future, total, length, \
f)); } catch (IOException e) {
future.setValue(e);
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java \
index b78ff72..42f4e28 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.channel.exception.SshChannelClosedException;
-import org.apache.sshd.common.io.PacketWriter;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriter;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ValidateUtils;
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
public class ChannelOutputStream extends OutputStream implements \
java.nio.channels.Channel, ChannelHolder {
private final AbstractChannel channelInstance;
- private final PacketWriter packetWriter;
+ private final ChannelStreamWriter packetWriter;
private final Window remoteWindow;
private final Duration maxWaitTimeout;
private final Logger log;
@@ -76,7 +76,7 @@ public class ChannelOutputStream extends OutputStream implements \
java.nio.channe
AbstractChannel channel, Window remoteWindow, \
Duration maxWaitTimeout, Logger log, byte cmd, boolean eofOnClose) {
this.channelInstance = Objects.requireNonNull(channel, "No channel");
- this.packetWriter = \
channelInstance.resolveChannelStreamPacketWriter(channel, cmd); + \
this.packetWriter = \
channelInstance.resolveChannelStreamWriter(channel, cmd);
this.remoteWindow = Objects.requireNonNull(remoteWindow, "No remote \
window"); Objects.requireNonNull(maxWaitTimeout, "No maxWaitTimeout");
ValidateUtils.checkTrue(GenericUtils.isPositive(maxWaitTimeout), \
"Non-positive max. wait time: %s", @@ -240,7 +240,7 @@ public class \
ChannelOutputStream extends OutputStream implements java.nio.channe \
log.trace("flush({}) send {} len={}",
channel, SshConstants.getCommandMessageName(cmd), \
length); }
- packetWriter.writePacket(buf);
+ packetWriter.writeData(buf);
}
} catch (WindowClosedException e) {
if (!closedState.getAndSet(true)) {
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java \
index 6fee66a..7bdba6b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java
@@ -26,12 +26,11 @@ import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.closeable.AbstractCloseable;
import org.apache.sshd.common.util.io.IoUtils;
-import org.apache.sshd.server.forward.TcpipServerChannel;
/**
* An implementation of {@link IoOutputStream} using a synchronous {@link \
ChannelOutputStream}.
*
- * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public class SimpleIoOutputStream extends AbstractCloseable implements \
IoOutputStream {
@@ -48,7 +47,7 @@ public class SimpleIoOutputStream extends AbstractCloseable \
implements IoOutputS }
@Override
- public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+ public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
os.write(buffer.array(), buffer.rpos(), buffer.available());
os.flush();
DefaultIoWriteFuture f = new DefaultIoWriteFuture(this, null);
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java index \
e2d7b94..0b33ad5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java
@@ -21,7 +21,7 @@ package org.apache.sshd.common.channel;
/**
* A channel that can be either configured to use synchronous or asynchrounous \
streams.
*
- * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public interface StreamingChannel {
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriter.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriter.java
new file mode 100644
index 0000000..a9b643a
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel.throttle;
+
+import java.io.IOException;
+import java.nio.channels.Channel;
+
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.buffer.Buffer;
+
+/**
+ * The ChannelStreamWriter is used when writing to the channel data stream. This \
data is encoded and sent with the + * {@link \
org.apache.sshd.common.SshConstants#SSH_MSG_CHANNEL_DATA} and + * {@link \
org.apache.sshd.common.SshConstants#SSH_MSG_CHANNEL_EXTENDED_DATA} commands. + *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface ChannelStreamWriter extends Channel {
+
+ /**
+ * Encode and send the given data packet buffer. <B>Note:</B> the buffer has to \
have 5 bytes free at the beginning + * to allow the encoding to take place. Also, \
the write position of the buffer has to be set to the position of the + * last \
byte to write. + *
+ * @param buffer the buffer to encode and send. <B>NOTE:</B> the buffer \
must not be touched until the returned + * write future is \
completed. + * @return An {@code IoWriteFuture} that can be used to \
check when the packet has actually been sent + * @throws IOException if an error \
occurred when encoding or sending the packet + */
+ IoWriteFuture writeData(Buffer buffer) throws IOException;
+
+}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolver.java
similarity index 77%
rename from sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java
rename to sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolver.java
index 75aaaea..0b71544 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolver.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolver.java
@@ -19,7 +19,6 @@
package org.apache.sshd.common.channel.throttle;
import org.apache.sshd.common.channel.Channel;
-import org.apache.sshd.common.io.PacketWriter;
/**
* A special mechanism that enables users to intervene in the way packets are sent \
from {@code ChannelOutputStream}-s - @@ -28,18 +27,19 @@ import \
org.apache.sshd.common.io.PacketWriter;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
@FunctionalInterface
-public interface ChannelStreamPacketWriterResolver {
+public interface ChannelStreamWriterResolver {
/**
* An identity resolver - i.e., no special intervention - simply use the channel \
itself
*/
- ChannelStreamPacketWriterResolver NONE = (channel, cmd) -> channel;
+ ChannelStreamWriterResolver NONE = (channel, cmd) -> new \
DefaultChannelStreamWriter(channel);
/**
* @param channel The original {@link Channel}
* @param cmd The {@code SSH_MSG_CHANNEL_DATA} or {@code \
SSH_MSG_CHANNEL_EXTENDED_DATA} command that triggered
* the resolution
- * @return The {@link PacketWriter} to use - <B>Note:</B> if the return \
value is not a {@link Channel} then
- * it will be closed when the stream is closed
+ * @return The {@link ChannelStreamWriter} to use - <B>Note:</B> if the \
return value is not a + * {@link Channel} then it will be closed \
when the stream is closed
*/
- PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd);
+ ChannelStreamWriter resolveChannelStreamWriter(Channel channel, byte cmd);
+
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolverManager.java
similarity index 54%
copy from sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java
copy to sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolverManager.java
index e50eeb8..34f8391 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamWriterResolverManager.java
@@ -19,26 +19,25 @@
package org.apache.sshd.common.channel.throttle;
import org.apache.sshd.common.channel.Channel;
-import org.apache.sshd.common.io.PacketWriter;
/**
* TODO Add javadoc
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public interface ChannelStreamPacketWriterResolverManager extends \
ChannelStreamPacketWriterResolver {
- ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver();
+public interface ChannelStreamWriterResolverManager extends \
ChannelStreamWriterResolver { + ChannelStreamWriterResolver \
getChannelStreamWriterResolver();
- void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver \
resolver); + void setChannelStreamWriterResolver(ChannelStreamWriterResolver \
resolver);
- default ChannelStreamPacketWriterResolver \
resolveChannelStreamPacketWriterResolver() {
- ChannelStreamPacketWriterResolver resolver = \
getChannelStreamPacketWriterResolver();
- return (resolver == null) ? ChannelStreamPacketWriterResolver.NONE : \
resolver; + default ChannelStreamWriterResolver \
resolveChannelStreamWriterResolver() { + return \
getChannelStreamWriterResolver(); }
@Override
- default PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd) \
{
- ChannelStreamPacketWriterResolver resolver = \
resolveChannelStreamPacketWriterResolver();
- return (resolver == null) ? channel : \
resolver.resolveChannelStreamPacketWriter(channel, cmd); + default \
ChannelStreamWriter resolveChannelStreamWriter(Channel channel, byte cmd) { + \
ChannelStreamWriterResolver resolver = resolveChannelStreamWriterResolver(); + \
ChannelStreamWriterResolver actual = (resolver == null) ? \
ChannelStreamWriterResolver.NONE : resolver; + return \
actual.resolveChannelStreamWriter(channel, cmd); }
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java \
b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/DefaultChannelStreamWriter.java
similarity index 53%
rename from sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java
rename to sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/DefaultChannelStreamWriter.java
index e50eeb8..83b1a91 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/ChannelStreamPacketWriterResolverManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/throttle/DefaultChannelStreamWriter.java
@@ -18,27 +18,41 @@
*/
package org.apache.sshd.common.channel.throttle;
+import java.io.IOException;
+
import org.apache.sshd.common.channel.Channel;
-import org.apache.sshd.common.io.PacketWriter;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.buffer.Buffer;
/**
- * TODO Add javadoc
+ * A ChannelStreamWriter that simply calls the {@link Channel#writePacket(Buffer)} \
method.
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public interface ChannelStreamPacketWriterResolverManager extends \
ChannelStreamPacketWriterResolver {
- ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver();
+public class DefaultChannelStreamWriter implements ChannelStreamWriter {
+
+ protected final Channel channel;
+ protected volatile boolean closed;
- void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver \
resolver); + public DefaultChannelStreamWriter(Channel channel) {
+ this.channel = channel;
+ }
- default ChannelStreamPacketWriterResolver \
resolveChannelStreamPacketWriterResolver() {
- ChannelStreamPacketWriterResolver resolver = \
getChannelStreamPacketWriterResolver();
- return (resolver == null) ? ChannelStreamPacketWriterResolver.NONE : \
resolver; + @Override
+ public IoWriteFuture writeData(Buffer buffer) throws IOException {
+ if (closed) {
+ throw new IOException("ChannelStreamPacketWriter has been closed");
+ }
+ return channel.writePacket(buffer);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return !closed;
}
@Override
- default PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd) \
{
- ChannelStreamPacketWriterResolver resolver = \
resolveChannelStreamPacketWriterResolver();
- return (resolver == null) ? channel : \
resolver.resolveChannelStreamPacketWriter(channel, cmd); + public void close() \
throws IOException { + closed = true;
}
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java \
b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java index \
f87d35c..156780e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
@@ -103,7 +103,7 @@ public class SocksProxy extends AbstractCloseable implements \
IoHandler { protected void onMessage(Buffer buffer) throws IOException {
IoOutputStream asyncIn = channel.getAsyncIn();
if (asyncIn != null) {
- asyncIn.writePacket(buffer);
+ asyncIn.writeBuffer(buffer);
} else {
OutputStream invertedIn = channel.getInvertedIn();
invertedIn.write(buffer.array(), buffer.rpos(), buffer.available());
@@ -185,7 +185,7 @@ public class SocksProxy extends AbstractCloseable implements \
IoHandler { buffer.putByte((byte) 0x00);
buffer.putByte((byte) 0x00);
try {
- session.writePacket(buffer);
+ session.writeBuffer(buffer);
} catch (IOException e) {
// TODO Auto-generated catch block
log.error("Failed ({}) to send channel open packet for {}: {}", \
e.getClass().getSimpleName(), channel, @@ -229,7 +229,7 @@ public class SocksProxy \
extends AbstractCloseable implements IoHandler { buffer = new \
ByteArrayBuffer(Byte.SIZE, false); buffer.putByte((byte) 0x05);
buffer.putByte((byte) (foundNoAuth ? 0x00 : 0xFF));
- session.writePacket(buffer);
+ session.writeBuffer(buffer);
if (!foundNoAuth) {
throw new IllegalStateException("Received socks5 greeting \
without NoAuth method"); } else if (debugEnabled) {
@@ -304,7 +304,7 @@ public class SocksProxy extends AbstractCloseable implements \
IoHandler { }
response.wpos(wpos);
try {
- session.writePacket(response);
+ session.writeBuffer(response);
} catch (IOException e) {
log.error("Failed ({}) to send channel open response for {}: {}", \
e.getClass().getSimpleName(), channel, e.getMessage());
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java \
b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java \
index c743948..2282b9a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -210,7 +210,7 @@ public class TcpipClientChannel extends AbstractClientChannel \
implements Forward
Buffer buf = ByteArrayBuffer.getCompactClone(data, off, (int) len);
Window wLocal = getLocalWindow();
wLocal.consumeAndCheck(len);
- serverSession.writePacket(buf);
+ serverSession.writeBuffer(buf);
}
@Override
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java \
b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java \
index 577da74..3abf709 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/helpers/AbstractFactoryManager.java
@@ -42,7 +42,7 @@ import org.apache.sshd.common.SyspropsMapWrapper;
import org.apache.sshd.common.channel.ChannelFactory;
import org.apache.sshd.common.channel.ChannelListener;
import org.apache.sshd.common.channel.RequestHandler;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver;
import org.apache.sshd.common.config.VersionProperties;
import org.apache.sshd.common.file.FileSystemFactory;
import org.apache.sshd.common.forward.ForwarderFactory;
@@ -96,7 +96,7 @@ public abstract class AbstractFactoryManager extends \
AbstractKexFactoryManager i
private PropertyResolver parentResolver = SyspropsMapWrapper.SYSPROPS_RESOLVER;
private ReservedSessionMessagesHandler reservedSessionMessagesHandler;
private SessionDisconnectHandler sessionDisconnectHandler;
- private ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver;
+ private ChannelStreamWriterResolver channelStreamWriterResolver;
private UnknownChannelReferenceHandler unknownChannelReferenceHandler;
private IoServiceEventListener eventListener;
@@ -314,13 +314,13 @@ public abstract class AbstractFactoryManager extends \
AbstractKexFactoryManager i }
@Override
- public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() \
{
- return channelStreamPacketWriterResolver;
+ public ChannelStreamWriterResolver getChannelStreamWriterResolver() {
+ return channelStreamWriterResolver;
}
@Override
- public void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver \
resolver) {
- channelStreamPacketWriterResolver = resolver;
+ public void setChannelStreamWriterResolver(ChannelStreamWriterResolver resolver) \
{ + channelStreamWriterResolver = resolver;
}
@Override
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java \
b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java index \
2200ba2..af9d0e2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -166,9 +166,9 @@ public class Nio2Session extends AbstractCloseable implements \
IoSession { }
@Override
- public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+ public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
if (log.isDebugEnabled()) {
- log.debug("writePacket({}) Writing {} bytes", this, buffer.available());
+ log.debug("writeBuffer({}) writing {} bytes", this, buffer.available());
}
ByteBuffer buf = ByteBuffer.wrap(buffer.array(), buffer.rpos(), \
buffer.available());
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java \
b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java index \
047dac7..081ccf4 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -31,13 +31,12 @@ import org.apache.sshd.common.FactoryManagerHolder;
import org.apache.sshd.common.Service;
import org.apache.sshd.common.auth.MutableUserHolder;
import org.apache.sshd.common.channel.ChannelListenerManager;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager;
import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
import org.apache.sshd.common.forward.PortForwardingInformationProvider;
import org.apache.sshd.common.future.KeyExchangeFuture;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.PacketWriter;
import org.apache.sshd.common.kex.KexFactoryManager;
import org.apache.sshd.common.kex.KeyExchange;
import org.apache.sshd.common.session.helpers.TimeoutIndicator;
@@ -58,12 +57,11 @@ public interface Session
ReservedSessionMessagesManager,
SessionDisconnectHandlerManager,
ChannelListenerManager,
- ChannelStreamPacketWriterResolverManager,
+ ChannelStreamWriterResolverManager,
PortForwardingEventListenerManager,
UnknownChannelReferenceHandlerManager,
FactoryManagerHolder,
- PortForwardingInformationProvider,
- PacketWriter {
+ PortForwardingInformationProvider {
/**
* Create a new buffer for the specified SSH packet and reserve the needed space \
(5 bytes) for the packet header. @@ -103,7 +101,7 @@ public interface Session
* "null" string is sent
* @param lang The language - {@code null}/empty if some pre-agreed \
default is used
* @return An {@code IoWriteFuture} that can be used to check when \
the packet has actually been sent
- * @throws IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding or sending the packet
* @see <A \
HREF="https://tools.ietf.org/html/rfc4253#section-11.3">RFC 4253 - \
section 11.3</A>
*/
IoWriteFuture sendDebugMessage(boolean display, Object msg, String lang) throws \
IOException; @@ -113,12 +111,22 @@ public interface Session
*
* @param data The message data
* @return An {@code IoWriteFuture} that can be used to check when \
the packet has actually been sent
- * @throws IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding or sending the packet
* @see <A \
HREF="https://tools.ietf.org/html/rfc4253#section-11.2">RFC 4253 - \
section 11.2</A>
*/
IoWriteFuture sendIgnoreMessage(byte... data) throws IOException;
/**
+ * Encode and send the given buffer. The buffer has to have 5 bytes free at the \
beginning to allow the encoding to + * take place. Also, the write position of \
the buffer has to be set to the position of the last byte to write. + *
+ * @param buffer the buffer to encode and send
+ * @return An {@code IoWriteFuture} that can be used to check when \
the packet has actually been sent + * @throws IOException if an error occurred \
when encoding sending the packet + */
+ IoWriteFuture writePacket(Buffer buffer) throws IOException;
+
+ /**
* Encode and send the given buffer with the specified timeout. If the buffer \
could not be written before the
* timeout elapses, the returned {@link org.apache.sshd.common.io.IoWriteFuture} \
will be set with a
* {@link java.util.concurrent.TimeoutException} exception to indicate a \
timeout. @@ -127,7 +135,7 @@ public interface Session
* @param timeout the (never {@code null}) timeout value - its {@link \
Duration#toMillis() milliseconds} value
* will be used
* @return a future that can be used to check when the packet has \
actually been sent
- * @throws IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding or sending the packet
* @see #writePacket(Buffer, long)
*/
default IoWriteFuture writePacket(Buffer buffer, Duration timeout) throws \
IOException { @@ -143,7 +151,7 @@ public interface Session
* @param buffer the buffer to encode and spend
* @param maxWaitMillis the timeout in milliseconds
* @return a future that can be used to check when the packet has \
actually been sent
- * @throws IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding or sending the \
packet
*/
default IoWriteFuture writePacket(Buffer buffer, long maxWaitMillis) throws \
IOException { return writePacket(buffer, maxWaitMillis, TimeUnit.MILLISECONDS);
@@ -158,7 +166,7 @@ public interface Session
* @param timeout the timeout
* @param unit the time unit of the timeout parameter
* @return a future that can be used to check when the packet has \
actually been sent
- * @throws IOException if an error occurred when encoding sending the packet
+ * @throws IOException if an error occurred when encoding or sending the packet
*/
IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit unit) throws \
IOException;
@@ -171,7 +179,7 @@ public interface Session
* @param timeout The number of time units to wait - \
must be <U>positive</U>
* @param unit The {@link TimeUnit} to wait for the \
response
* @return the return buffer if the request was \
successful, {@code null} otherwise.
- * @throws IOException if an error occurred when encoding \
sending the packet + * @throws IOException if an error \
occurred when encoding or sending the packet
* @throws java.net.SocketTimeoutException If no response received within \
specified timeout
*/
default Buffer request(
@@ -190,7 +198,7 @@ public interface Session
* @param buffer the buffer containing the global \
request
* @param timeout The (never {@code null}) timeout to \
wait - its milliseconds value is used
* @return the return buffer if the request was \
successful, {@code null} otherwise.
- * @throws IOException if an error occurred when encoding \
sending the packet + * @throws IOException if an error \
occurred when encoding or sending the packet
* @throws java.net.SocketTimeoutException If no response received within \
specified timeout
*/
default Buffer request(String request, Buffer buffer, Duration timeout) throws \
IOException { @@ -206,7 +214,7 @@ public interface Session
* @param buffer the buffer containing the global \
request
* @param maxWaitMillis Max. time to wait for response \
(millis) - must be <U>positive</U>
* @return the return buffer if the request was \
successful, {@code null} otherwise.
- * @throws IOException if an error occurred when encoding \
sending the packet + * @throws IOException if an error \
occurred when encoding or sending the packet
* @throws java.net.SocketTimeoutException If no response received within \
specified timeout
*/
Buffer request(String request, Buffer buffer, long maxWaitMillis) throws \
IOException;
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java \
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java \
index 636a212..124895f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -63,7 +63,6 @@ import org.apache.sshd.common.future.KeyExchangeFuture;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.PacketWriter;
import org.apache.sshd.common.kex.KexProposalOption;
import org.apache.sshd.common.kex.KexState;
import org.apache.sshd.common.kex.KeyExchange;
@@ -254,6 +253,46 @@ public abstract class AbstractSession extends SessionHelper {
}
}
+ /**
+ * @param len The packet payload size
+ * @param blockSize The cipher block size
+ * @param etmMode Whether using "encrypt-then-MAC" mode
+ * @return The required padding length
+ */
+ public static int calculatePadLength(int len, int blockSize, boolean etmMode) {
+ /*
+ * Note: according to RFC-4253 section 6:
+ *
+ * The minimum size of a packet is 16 (or the cipher block size, whichever \
is larger) bytes (plus 'mac'). + *
+ * Since all out ciphers, MAC(s), etc. have a block size > 8 then the \
minimum size of the packet will be at + * least 16 due to the padding at the \
very least - so even packets that contain an opcode with no arguments will + \
* be above this value. This avoids an un-necessary call to Math.max(len, 16) for each \
and every packet + */
+
+ len++; // the pad length
+ if (!etmMode) {
+ len += Integer.BYTES;
+ }
+
+ /*
+ * Note: according to RFC-4253 section 6:
+ *
+ * Note that the length of the concatenation of 'packet_length', \
'padding_length', 'payload', and 'random + * padding' MUST be a multiple of \
the cipher block size or 8, whichever is larger. + *
+ * However, we currently do not have ciphers with a block size of less than \
8 so we do not take this into + * account in order to accelerate the \
calculation and avoiding an un-necessary call to Math.max(blockSize, 8) + * \
for each and every packet. + */
+ int pad = (-len) & (blockSize - 1);
+ if (pad < blockSize) {
+ pad += blockSize;
+ }
+
+ return pad;
+ }
+
@Override
public String getServerVersion() {
return serverVersion;
@@ -935,7 +974,7 @@ public abstract class AbstractSession extends SessionHelper {
ignoreBuf = encode(ignoreBuf);
IoSession networkSession = getIoSession();
- networkSession.writePacket(ignoreBuf);
+ networkSession.writeBuffer(ignoreBuf);
}
return encode(buffer);
@@ -948,7 +987,7 @@ public abstract class AbstractSession extends SessionHelper {
synchronized (encodeLock) {
Buffer packet = resolveOutputPacket(buffer);
IoSession networkSession = getIoSession();
- IoWriteFuture future = networkSession.writePacket(packet);
+ IoWriteFuture future = networkSession.writeBuffer(packet);
return future;
}
}
@@ -1104,7 +1143,7 @@ public abstract class AbstractSession extends SessionHelper {
boolean etmMode = outMac != null && outMac.isEncryptThenMac();
int authLen = outCipher != null ? outCipher.getAuthenticationTagSize() : 0;
boolean authMode = authLen > 0;
- int pad = PacketWriter.calculatePadLength(len, outCipherSize, etmMode || \
authMode); + int pad = calculatePadLength(len, outCipherSize, etmMode || \
authMode); len += SshConstants.SSH_PACKET_HEADER_LEN + pad + authLen;
if (outMac != null) {
len += outMacSize;
@@ -1204,7 +1243,7 @@ public abstract class AbstractSession extends SessionHelper {
boolean authMode = authSize > 0;
int oldLen = len;
- int pad = PacketWriter.calculatePadLength(len, outCipherSize, etmMode || \
authMode); + int pad = calculatePadLength(len, outCipherSize, etmMode || \
authMode);
len += Byte.BYTES + pad;
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java \
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java \
index 9c15dda..6883451 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java
@@ -48,8 +48,8 @@ import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.RuntimeSshException;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolverManager;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager;
import org.apache.sshd.common.digest.Digest;
import org.apache.sshd.common.forward.Forwarder;
import org.apache.sshd.common.future.DefaultSshFuture;
@@ -104,7 +104,7 @@ public abstract class SessionHelper extends \
AbstractKexFactoryManager implements
private ReservedSessionMessagesHandler reservedSessionMessagesHandler;
private SessionDisconnectHandler sessionDisconnectHandler;
private UnknownChannelReferenceHandler unknownChannelReferenceHandler;
- private ChannelStreamPacketWriterResolver channelStreamPacketWriterResolver;
+ private ChannelStreamWriterResolver channelStreamPacketWriterResolver;
/**
* The name of the authenticated user
@@ -514,24 +514,24 @@ public abstract class SessionHelper extends \
AbstractKexFactoryManager implements }
@Override
- public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() \
{ + public ChannelStreamWriterResolver getChannelStreamWriterResolver() {
return channelStreamPacketWriterResolver;
}
@Override
- public void setChannelStreamPacketWriterResolver(ChannelStreamPacketWriterResolver \
resolver) { + public void \
setChannelStreamWriterResolver(ChannelStreamWriterResolver resolver) { \
channelStreamPacketWriterResolver = resolver; }
@Override
- public ChannelStreamPacketWriterResolver \
resolveChannelStreamPacketWriterResolver() {
- ChannelStreamPacketWriterResolver resolver = \
getChannelStreamPacketWriterResolver(); + public ChannelStreamWriterResolver \
resolveChannelStreamWriterResolver() { + ChannelStreamWriterResolver resolver \
= getChannelStreamWriterResolver(); if (resolver != null) {
return resolver;
}
- ChannelStreamPacketWriterResolverManager manager = getFactoryManager();
- return manager.resolveChannelStreamPacketWriterResolver();
+ ChannelStreamWriterResolverManager manager = getFactoryManager();
+ return manager.resolveChannelStreamWriterResolver();
}
@Override
@@ -793,7 +793,7 @@ public abstract class SessionHelper extends \
AbstractKexFactoryManager implements
IoSession networkSession = getIoSession();
byte[] data = (ident + "\r\n").getBytes(StandardCharsets.UTF_8);
- return networkSession.writePacket(new ByteArrayBuffer(data));
+ return networkSession.writeBuffer(new ByteArrayBuffer(data));
}
/**
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java \
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java \
index e14775a..874b49e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -217,24 +217,26 @@ public class TcpipServerChannel extends AbstractServerChannel \
implements Streami if (streaming == Streaming.Async) {
out = new BufferedIoOutputStream(
"tcpip channel", new ChannelAsyncOutputStream(this, \
SshConstants.SSH_MSG_CHANNEL_DATA) {
- @SuppressWarnings("synthetic-access")
- @Override
- protected CloseFuture doCloseGracefully() {
- try {
- sendEof();
- } catch (IOException e) {
- session.exceptionCaught(e);
- }
- return super.doCloseGracefully();
- }
- });
+ @SuppressWarnings("synthetic-access")
+ @Override
+ protected CloseFuture doCloseGracefully() {
+ try {
+ sendEof();
+ } catch (IOException e) {
+ session.exceptionCaught(e);
+ }
+ return super.doCloseGracefully();
+ }
+ });
} else {
- this.out = new SimpleIoOutputStream(new ChannelOutputStream(
- this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, \
true)); + this.out = new SimpleIoOutputStream(
+ new ChannelOutputStream(
+ this, getRemoteWindow(), log, \
SshConstants.SSH_MSG_CHANNEL_DATA, true));
}
long thresholdHigh = \
CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH.getRequired(this);
- long thresholdLow = \
CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW.get(this).orElse(thresholdHigh \
/ 2); + long thresholdLow
+ = CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW.get(this).orElse(thresholdHigh \
/ 2); IoHandler handler = new IoHandler() {
@Override
@SuppressWarnings("synthetic-access")
@@ -251,7 +253,7 @@ public class TcpipServerChannel extends AbstractServerChannel \
implements Streami if (total > thresholdHigh) {
session.suspendRead();
}
- IoWriteFuture ioWriteFuture = out.writePacket(buffer);
+ IoWriteFuture ioWriteFuture = out.writeBuffer(buffer);
ioWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() \
{ @Override
public void operationComplete(IoWriteFuture future) {
@@ -379,7 +381,7 @@ public class TcpipServerChannel extends AbstractServerChannel \
implements Streami
ValidateUtils.checkTrue(len <= Integer.MAX_VALUE, "Data length exceeds int \
boundaries: %d", len);
// Make sure we copy the data as the incoming buffer may be reused
Buffer buf = ByteArrayBuffer.getCompactClone(data, off, (int) len);
- ioSession.writePacket(buf).addListener(future -> {
+ ioSession.writeBuffer(buf).addListener(future -> {
if (future.isWritten()) {
handleWriteDataSuccess(
SshConstants.SSH_MSG_CHANNEL_DATA, buf.array(), 0, (int) \
len);
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java \
b/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java \
index 06fcc46..4c2c9b3 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java
@@ -325,7 +325,7 @@ public abstract class AbstractServerSession extends \
AbstractSession implements S startService(authService, buffer);
// Now we can inform the peer that authentication is successful
- future = networkSession.writePacket(packet);
+ future = networkSession.writeBuffer(packet);
}
resetIdleTimeout();
@@ -491,7 +491,7 @@ public abstract class AbstractServerSession extends \
AbstractSession implements S
if (err != null) {
IoSession networkSession = getIoSession();
- networkSession.writePacket(
+ networkSession.writeBuffer(
new ByteArrayBuffer((err.getMessage() + \
"\n").getBytes(StandardCharsets.UTF_8)))
.addListener(future -> close(true));
throw err;
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java \
b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java index \
2c1cdee..9dd046a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java
@@ -101,7 +101,7 @@ public class ChannelForwardedX11 extends AbstractClientChannel {
wLocal.consumeAndCheck(len);
// use a clone in case data buffer is re-used
Buffer packet = ByteArrayBuffer.getCompactClone(data, off, (int) len);
- serverSession.writePacket(packet);
+ serverSession.writeBuffer(packet);
}
@Override
diff --git a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java \
b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java index 7afae32..5c33c0c 100644
--- a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
@@ -23,20 +23,16 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.sshd.client.ClientBuilder;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.channel.ClientChannelEvent;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.channel.Channel;
-import org.apache.sshd.common.cipher.BuiltinCiphers;
-import org.apache.sshd.common.kex.BuiltinDHFactories;
import org.apache.sshd.common.util.security.SecurityUtils;
import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.server.SshServer;
diff --git a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java \
b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java index \
98afcb8..bb43718 100644
--- a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
@@ -300,7 +300,7 @@ public class WindowAdjustTest extends BaseTestSupport {
private void writeWithPendingDetection(Buffer msg, boolean wasPending) \
throws IOException { try {
- asyncIn.writePacket(msg).addListener(future -> {
+ asyncIn.writeBuffer(msg).addListener(future -> {
if (future.isWritten()) {
if (wasPending) {
pending.remove();
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java \
b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java index \
c7487f6..cf2e0cd 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
@@ -73,6 +73,7 @@ import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.channel.ChannelListener;
+import org.apache.sshd.common.channel.StreamingChannel;
import org.apache.sshd.common.channel.exception.SshChannelClosedException;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
@@ -523,7 +524,7 @@ public class ClientTest extends BaseTestSupport {
try (ClientSession session = createTestClientSession();
ChannelShell channel = session.createShellChannel()) {
- channel.setStreaming(ClientChannel.Streaming.Async);
+ channel.setStreaming(StreamingChannel.Streaming.Async);
channel.open().verify(OPEN_TIMEOUT);
byte[] message = "0123456789\n".getBytes(StandardCharsets.UTF_8);
@@ -533,14 +534,14 @@ public class ClientTest extends BaseTestSupport {
AtomicInteger writes = new AtomicInteger(nbMessages);
IoOutputStream asyncIn = channel.getAsyncIn();
- asyncIn.writePacket(new ByteArrayBuffer(message))
+ asyncIn.writeBuffer(new ByteArrayBuffer(message))
.addListener(new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(IoWriteFuture future) {
try {
if (future.isWritten()) {
if (writes.decrementAndGet() > 0) {
- asyncIn.writePacket(new \
ByteArrayBuffer(message)).addListener(this); + \
asyncIn.writeBuffer(new ByteArrayBuffer(message)).addListener(this); } else {
asyncIn.close(false);
}
@@ -622,7 +623,7 @@ public class ClientTest extends BaseTestSupport {
ByteArrayOutputStream baosErr = new ByteArrayOutputStream();
try (ChannelExec channel = session.createExecChannel("test")) {
- channel.setStreaming(ClientChannel.Streaming.Async);
+ channel.setStreaming(StreamingChannel.Streaming.Async);
OpenFuture open = channel.open();
Thread.sleep(100L); // Removing this line will make the test succeed
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java \
b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java index \
7618206..f42ad61 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/channel/WindowTest.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.channel.ChannelShell;
-import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.RuntimeSshException;
@@ -273,7 +272,7 @@ public class WindowTest extends BaseTestSupport {
session.auth().verify(AUTH_TIMEOUT);
try (ChannelShell channel = session.createShellChannel()) {
- channel.setStreaming(ClientChannel.Streaming.Async);
+ channel.setStreaming(StreamingChannel.Streaming.Async);
channel.open().verify(OPEN_TIMEOUT);
try (Channel serverChannel = \
GenericUtils.head(GenericUtils.head(sshd.getActiveSessions()) @@ -290,7 +289,7 @@ \
public class WindowTest extends BaseTestSupport { IoInputStream input = \
channel.getAsyncOut(); for (int i = 0; i < nbMessages; i++) {
Buffer buffer = new ByteArrayBuffer(bytes);
- output.writePacket(buffer).verify(DEFAULT_TIMEOUT);
+ output.writeBuffer(buffer).verify(DEFAULT_TIMEOUT);
waitForWindowNotEquals(clientLocal, serverRemote, "client \
local", "server remote", TimeUnit.SECONDS.toMillis(3L));
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java \
b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java \
index 1715ed2..1bbe342 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java
@@ -322,7 +322,10 @@ public class PortForwardingLoadTest extends BaseTestSupport {
lastReport = readSize;
}
} catch (SocketTimeoutException e) {
- throw new IOException("Error reading data at index " \
+ readSize + "/" + dataBytes.length + " of iteration #" + i, e); + \
throw new IOException( + "Error reading data \
at index " + readSize + "/" + dataBytes.length + " of iteration #" + \
+ i, + e);
}
}
assertPayloadEquals("Mismatched payload at iteration #" + i, \
dataBytes, baos.toByteArray());
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java \
b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
index 35bd8b5..d8fd35f 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
@@ -389,7 +389,7 @@ public class AbstractSessionTest extends BaseTestSupport {
}
@Override
- public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+ public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
if (!isOpen()) {
throw new EOFException("Not open");
}
diff --git a/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java \
b/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java index \
0d09ec9..553142e 100644
--- a/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/server/ServerProxyAcceptorTest.java
@@ -132,7 +132,7 @@ public class ServerProxyAcceptorTest extends BaseTestSupport {
client.setClientProxyConnector(session -> {
IoSession ioSession = session.getIoSession();
- ioSession.writePacket(new ByteArrayBuffer(metaDataBytes));
+ ioSession.writeBuffer(new ByteArrayBuffer(metaDataBytes));
});
client.start();
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java \
b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java index \
9084a13..b550893 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
@@ -140,7 +140,7 @@ public class AsyncEchoShellFactory implements ShellFactory {
if (buffer.charAt(i) == '\n') {
String s = buffer.substring(0, i + 1);
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
- out.writePacket(new ByteArrayBuffer(bytes)).addListener(future \
-> { + out.writeBuffer(new \
ByteArrayBuffer(bytes)).addListener(future -> { Session session1 = \
channel.getSession(); if (future.isWritten()) {
try {
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java \
b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java index \
8700e7a..80e576c 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java
@@ -24,8 +24,9 @@ import org.apache.sshd.client.future.DefaultOpenFuture;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.channel.AbstractChannel;
import org.apache.sshd.common.channel.Channel;
-import org.apache.sshd.common.channel.throttle.ChannelStreamPacketWriterResolver;
-import org.apache.sshd.common.io.PacketWriter;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriter;
+import org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolver;
+import org.apache.sshd.common.channel.throttle.DefaultChannelStreamWriter;
import org.apache.sshd.common.util.buffer.Buffer;
public class BogusChannel extends AbstractChannel {
@@ -64,12 +65,13 @@ public class BogusChannel extends AbstractChannel {
}
@Override
- public ChannelStreamPacketWriterResolver getChannelStreamPacketWriterResolver() \
{
- return ChannelStreamPacketWriterResolver.NONE;
+ public ChannelStreamWriterResolver getChannelStreamWriterResolver() {
+ return ChannelStreamWriterResolver.NONE;
}
@Override
- public PacketWriter resolveChannelStreamPacketWriter(Channel channel, byte cmd) \
{
- return channel;
+ public ChannelStreamWriter resolveChannelStreamWriter(Channel channel, byte cmd) \
{ + return new DefaultChannelStreamWriter(channel);
}
+
}
diff --git a/sshd-mina/src/main/java/org/apache/sshd/mina/MinaSession.java \
b/sshd-mina/src/main/java/org/apache/sshd/mina/MinaSession.java index \
af92e6e..04c7ea7 100644
--- a/sshd-mina/src/main/java/org/apache/sshd/mina/MinaSession.java
+++ b/sshd-mina/src/main/java/org/apache/sshd/mina/MinaSession.java
@@ -167,7 +167,7 @@ public class MinaSession extends AbstractInnerCloseable \
implements IoSession { }
@Override // NOTE !!! data buffer may NOT be re-used when method returns - at \
least until IoWriteFuture is signalled
- public IoWriteFuture writePacket(Buffer buffer) {
+ public IoWriteFuture writeBuffer(Buffer buffer) {
return write(MinaSupport.asIoBuffer(buffer));
}
diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java \
b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java index \
984c32d..01ab21d 100644
--- a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java
+++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java
@@ -136,7 +136,7 @@ public class NettyIoSession extends AbstractCloseable implements \
IoSession { }
@Override
- public IoWriteFuture writePacket(Buffer buffer) {
+ public IoWriteFuture writeBuffer(Buffer buffer) {
int bufLen = buffer.available();
ByteBuf buf = Unpooled.buffer(bufLen);
buf.writeBytes(buffer.array(), buffer.rpos(), bufLen);
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java \
b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java \
index 9db3731..ba29ed4 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java
@@ -288,7 +288,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
}
IoOutputStream asyncIn = channel.getAsyncIn();
- IoWriteFuture writeFuture = asyncIn.writePacket(buf);
+ IoWriteFuture writeFuture = asyncIn.writeBuffer(buf);
writeFuture.verify();
return id;
}
@@ -368,7 +368,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
if (traceEnabled) {
log.trace("init({}) send SSH_FXP_INIT - initial version={}", \
clientChannel, initialVersion); }
- IoWriteFuture writeFuture = asyncIn.writePacket(buf);
+ IoWriteFuture writeFuture = asyncIn.writeBuffer(buf);
writeFuture.verify();
if (traceEnabled) {
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/server/SftpSubsystem.java \
b/sshd-sftp/src/main/java/org/apache/sshd/sftp/server/SftpSubsystem.java index \
034794f..5ad6869 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/server/SftpSubsystem.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/server/SftpSubsystem.java
@@ -943,7 +943,7 @@ public class SftpSubsystem
@Override
protected void send(Buffer buffer) throws IOException {
BufferUtils.updateLengthPlaceholder(buffer, 0);
- out.writePacket(buffer);
+ out.writeBuffer(buffer);
}
@Override
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic