[prev in list] [next in list] [prev in thread] [next in thread]
List: mina-commits
Subject: (mina) 01/01: working on the multi-phase synchronization; it was working before but is currently bro
From: johnnyv () apache ! org
Date: 2024-02-20 2:34:21
Message-ID: 20240220023420.74D274407BC () gitbox2-he-fi ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
johnnyv pushed a commit to branch bugfix/DIRMINA-1173
in repository https://gitbox.apache.org/repos/asf/mina.git
commit 0cbbc8b2ef2a10d6c0ce9f2101fb0ed1b51466a8
Author: Jonathan Valliere <jon.valliere@emoten.com>
AuthorDate: Mon Feb 19 21:19:21 2024 -0500
working on the multi-phase synchronization; it was working before but is \
currently broken
---
.../org/apache/mina/filter/ssl/SSLHandlerG0.java | 2 +-
.../ssl/{SSLHandlerG0.java => SSLHandlerG1.java} | 288 ++++++++++++---------
.../java/org/apache/mina/filter/ssl/SslFilter.java | 19 +-
3 files changed, 188 insertions(+), 121 deletions(-)
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java \
b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java index \
3109426a2..72f3e2bdb 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
@@ -136,7 +136,7 @@ import org.apache.mina.core.write.WriteRequest;
if (mEngine.getUseClientMode()) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} open() - begin handshaking", toString());
+ LOGGER.debug("{} open() - begin handshaking", this);
}
mEngine.beginHandshake();
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java \
b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java similarity \
index 78% copy from mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
copy to mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java
index 3109426a2..5cb6858ea 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java
@@ -19,20 +19,21 @@
*/
package org.apache.mina.filter.ssl;
-import java.nio.BufferOverflowException;
-import java.util.ArrayList;
-import java.util.concurrent.Executor;
-
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLException;
-
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.filterchain.IoFilter.NextFilter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.write.WriteRejectedException;
import org.apache.mina.core.write.WriteRequest;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import java.nio.BufferOverflowException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executor;
+
/**
* Default implementation of SSLHandler
* <p>
@@ -42,7 +43,7 @@ import org.apache.mina.core.write.WriteRequest;
* @author Jonathan Valliere
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-/* package protected */ class SSLHandlerG0 extends SslHandler {
+/* package protected */ class SSLHandlerG1 extends SslHandler {
/**
* Maximum number of queued messages waiting for encoding
@@ -67,7 +68,7 @@ import org.apache.mina.core.write.WriteRequest;
/**
* Enable asynchronous tasks
*/
- static protected final boolean ENABLE_ASYNC_TASKS = true;
+ static protected final boolean ENABLE_ASYNC_TASKS = false;
/**
* Indicates whether the first handshake was completed
@@ -90,9 +91,19 @@ import org.apache.mina.core.write.WriteRequest;
protected boolean mOutboundLinger = false;
/**
- * Holds the decoder thread reference; used for recursion detection
+ * Holds the decoder thread reference; used for recursion detection introduced \
by a delegated task + */
+ protected volatile Thread mReceiveThread = null;
+
+ /**
+ * Encoded buffers ready for processing upstream
+ */
+ protected final Deque<EncryptedWriteRequest> mWriteQueue = new \
ConcurrentLinkedDeque<>(); +
+ /**
+ * Decoded buffers ready for processing downstream
*/
- protected Thread mDecodeThread = null;
+ protected final Deque<IoBuffer> mReceiveQueue = new ConcurrentLinkedDeque<>();
/**
* Captured error state
@@ -101,12 +112,12 @@ import org.apache.mina.core.write.WriteRequest;
/**
* Instantiates a new handler
- *
+ *
* @param sslEngine The SSLEngine instance
* @param executor The executor instance to use to process tasks
* @param session The session to handle
*/
- public SSLHandlerG0(SSLEngine sslEngine, Executor executor, IoSession session) {
+ public SSLHandlerG1(SSLEngine sslEngine, Executor executor, IoSession session) {
super(sslEngine, executor, session);
}
@@ -130,49 +141,72 @@ import org.apache.mina.core.write.WriteRequest;
* {@inheritDoc}
*/
@Override
- synchronized public void open(NextFilter next) throws SSLException {
- if (mHandshakeStarted == false) {
- mHandshakeStarted = true;
-
- if (mEngine.getUseClientMode()) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} open() - begin handshaking", toString());
+ public void open(NextFilter next) throws SSLException {
+ synchronized (this) {
+ if (mHandshakeStarted == false) {
+ mHandshakeStarted = true;
+ if (mEngine.getUseClientMode()) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} open() - begin handshaking", this);
+ }
+ mEngine.beginHandshake();
+ write_handshake(next);
}
-
- mEngine.beginHandshake();
- write_handshake(next);
}
}
+ synchronized (mWriteQueue) {
+ EncryptedWriteRequest x;
+ while((x = mWriteQueue.poll()) != null) {
+ next.filterWrite(mSession, x);
+ }
+ }
+ synchronized (this) {
+ throw_pending_error(next);
+ }
}
/**
* {@inheritDoc}
*/
@Override
- synchronized public void receive(NextFilter next, IoBuffer message) throws \
SSLException {
- if (mDecodeThread == null) {
+ public void receive(NextFilter next, IoBuffer message) throws SSLException {
+ receive_start(next, message);
+ synchronized (mReceiveQueue) {
+ IoBuffer x;
+ while((x = mReceiveQueue.poll()) != null) {
+ next.messageReceived(mSession, x);
+ }
+ }
+ synchronized (mWriteQueue) {
+ EncryptedWriteRequest x;
+ while((x = mWriteQueue.poll()) != null) {
+ next.filterWrite(mSession, x);
+ }
+ }
+ synchronized (this) {
+ throw_pending_error(next);
+ }
+ }
+
+ synchronized protected void receive_start(NextFilter next, IoBuffer message) \
throws SSLException { + if(mReceiveThread == Thread.currentThread()) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} receive() - recursion", toString());
+ }
+ receive_loop(next, mDecodeBuffer);
+ } else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} receive() - message {}", toString(), message);
}
-
- mDecodeThread = Thread.currentThread();
+ mReceiveThread = Thread.currentThread();
IoBuffer source = resume_decode_buffer(message);
-
try {
receive_loop(next, source);
} finally {
suspend_decode_buffer(source);
- mDecodeThread = null;
+ mReceiveThread = null;
}
- } else {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} receive() - recursion", toString());
- }
-
- receive_loop(next, mDecodeBuffer);
}
-
- throw_pending_error(next);
}
/**
@@ -232,8 +266,8 @@ import org.apache.mina.core.write.WriteRequest;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} receive_loop() - result {}", toString(), dest);
}
-
- next.messageReceived(mSession, dest);
+
+ mReceiveQueue.push(dest);
}
switch (result.getHandshakeStatus()) {
@@ -288,61 +322,72 @@ import org.apache.mina.core.write.WriteRequest;
* {@inheritDoc}
*/
@Override
- synchronized public void ack(NextFilter next, WriteRequest request) throws \
SSLException {
- if (mAckQueue.remove(request)) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} ack() - {}", toString(), request);
+ public void ack(NextFilter next, WriteRequest request) throws SSLException {
+ synchronized (this) {
+ if (mAckQueue.remove(request)) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} ack() - {}", toString(), request);
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} ack() - checking to see if any messages can be \
flushed", toString(), request); + }
+ flush_start(next);
}
-
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} ack() - checking to see if any messages can be \
flushed", toString(), request); + }
+ synchronized (mWriteQueue) {
+ EncryptedWriteRequest x;
+ while((x = mWriteQueue.poll()) != null) {
+ next.filterWrite(mSession, x);
}
-
- flush(next);
}
-
- throw_pending_error(next);
+ synchronized (this) {
+ throw_pending_error(next);
+ }
}
/**
* {@inheritDoc}
*/
@Override
- synchronized public void write(NextFilter next, WriteRequest request) throws \
SSLException, WriteRejectedException {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write() - source {}", toString(), request);
- }
-
- if (mOutboundClosing) {
- throw new WriteRejectedException(request, "closing");
- }
-
- if (mEncodeQueue.isEmpty()) {
- if (write_user_loop(next, request) == false) {
+ public void write(NextFilter next, WriteRequest request) throws SSLException, \
WriteRejectedException { + synchronized (this) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} write() - source {}", toString(), request);
+ }
+ if (mOutboundClosing) {
+ throw new WriteRejectedException(request, "closing");
+ }
+ if (mEncodeQueue.isEmpty()) {
+ if (write_loop(next, request) == false) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} write() - unable to write right now, saving \
request for later", toString(), + request);
+ }
+ if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) {
+ throw new BufferOverflowException();
+ }
+ mEncodeQueue.add(request);
+ }
+ } else {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write() - unable to write right now, saving \
request for later", toString(),
- request);
+ LOGGER.debug("{} write() - unable to write right now, saving \
request for later", toString(), request); }
-
if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) {
throw new BufferOverflowException();
}
-
mEncodeQueue.add(request);
}
- } else {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write() - unable to write right now, saving request \
for later", toString(), request);
- }
-
- if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) {
- throw new BufferOverflowException();
+ }
+ synchronized (mWriteQueue) {
+ EncryptedWriteRequest x;
+ while((x = mWriteQueue.poll()) != null) {
+ next.filterWrite(mSession, x);
}
-
- mEncodeQueue.add(request);
}
-
- throw_pending_error(next);
+ synchronized (this) {
+ throw_pending_error(next);
+ }
}
/**
@@ -357,7 +402,7 @@ import org.apache.mina.core.write.WriteRequest;
* @throws SSLException
*/
@SuppressWarnings("incomplete-switch")
- synchronized protected boolean write_user_loop(NextFilter next, WriteRequest \
request) throws SSLException { + synchronized protected boolean \
write_loop(NextFilter next, WriteRequest request) throws SSLException { if \
(LOGGER.isDebugEnabled()) {
LOGGER.debug("{} write_user_loop() - source {}", toString(), request);
}
@@ -383,8 +428,8 @@ import org.apache.mina.core.write.WriteRequest;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} write_user_loop() - result {}", toString(), \
encrypted); }
-
- next.filterWrite(mSession, encrypted);
+
+ mWriteQueue.push(encrypted);
// do not return because we want to enter the handshake switch
} else {
// then we probably consumed some data
@@ -397,11 +442,11 @@ import org.apache.mina.core.write.WriteRequest;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} write_user_loop() - result {}", toString(), \
encrypted); }
-
- next.filterWrite(mSession, encrypted);
+
+ mWriteQueue.push(encrypted);
if (mAckQueue.size() < MAX_UNACK_MESSAGES) {
- return write_user_loop(next, request); // write additional \
chunks + return write_loop(next, request); // write additional \
chunks }
return false;
@@ -412,8 +457,8 @@ import org.apache.mina.core.write.WriteRequest;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} write_user_loop() - result {}", toString(), \
encrypted); }
-
- next.filterWrite(mSession, encrypted);
+
+ mWriteQueue.push(encrypted);
return true;
}
@@ -435,7 +480,7 @@ import org.apache.mina.core.write.WriteRequest;
LOGGER.debug("{} write_user_loop() - handshake needs wrap, \
looping", toString()); }
- return write_user_loop(next, request);
+ return write_loop(next, request);
case FINISHED:
if (LOGGER.isDebugEnabled()) {
@@ -444,7 +489,7 @@ import org.apache.mina.core.write.WriteRequest;
finish_handshake(next);
- return write_user_loop(next, request);
+ return write_loop(next, request);
}
return false;
@@ -536,7 +581,7 @@ import org.apache.mina.core.write.WriteRequest;
}
EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
- next.filterWrite(mSession, encrypted);
+ mWriteQueue.push(encrypted);
}
switch (result.getHandshakeStatus()) {
@@ -544,15 +589,13 @@ import org.apache.mina.core.write.WriteRequest;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} write_handshake_loop() - handshake needs \
unwrap, invoking receive", toString()); }
-
- receive(next, ZERO);
+ receive_start(next, ZERO);
break;
case NEED_WRAP:
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} write_handshake_loop() - handshake needs wrap, \
looping", toString()); }
-
write_handshake(next);
break;
@@ -560,7 +603,6 @@ import org.apache.mina.core.write.WriteRequest;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} write_handshake_loop() - handshake needs task, \
scheduling", toString()); }
-
schedule_task(next);
break;
@@ -568,7 +610,6 @@ import org.apache.mina.core.write.WriteRequest;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} write_handshake_loop() - handshake finished, \
flushing queue", toString()); }
-
finish_handshake(next);
break;
}
@@ -592,8 +633,24 @@ import org.apache.mina.core.write.WriteRequest;
/**
* There exists a bug in the JDK which emits FINISHED twice instead of once.
*/
- receive(next, ZERO);
- flush(next);
+ receive_start(next, ZERO);
+ flush_start(next);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void flush(NextFilter next) throws SSLException {
+ flush_start(next);
+ synchronized (mWriteQueue) {
+ EncryptedWriteRequest x;
+ while((x = mWriteQueue.poll()) != null) {
+ next.filterWrite(mSession, x);
+ }
+ }
+ synchronized (this) {
+ throw_pending_error(next);
+ }
}
/**
@@ -603,7 +660,7 @@ import org.apache.mina.core.write.WriteRequest;
*
* @throws SSLException
*/
- synchronized public void flush(NextFilter next) throws SSLException {
+ synchronized protected void flush_start(NextFilter next) throws SSLException {
if (mOutboundClosing && mOutboundLinger == false) {
return;
}
@@ -623,7 +680,7 @@ import org.apache.mina.core.write.WriteRequest;
LOGGER.debug("{} flush() - {}", toString(), current);
}
- if (write_user_loop(next, current) == false) {
+ if (write_loop(next, current) == false) {
mEncodeQueue.addFirst(current);
break;
@@ -643,35 +700,42 @@ import org.apache.mina.core.write.WriteRequest;
* {@inheritDoc}
*/
@Override
- synchronized public void close(NextFilter next, boolean linger) throws \
SSLException { + public void close(NextFilter next, boolean linger) throws \
SSLException { + close_start(next, linger);
+ synchronized (mWriteQueue) {
+ EncryptedWriteRequest x;
+ while((x = mWriteQueue.poll()) != null) {
+ next.filterWrite(mSession, x);
+ }
+ }
+ synchronized (this) {
+ throw_pending_error(next);
+ }
+ }
+
+ synchronized protected void close_start(NextFilter next, boolean linger) throws \
SSLException { if (mOutboundClosing) {
return;
}
-
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} close() - closing session", toString());
}
-
if (mHandshakeComplete) {
next.event(mSession, SslEvent.UNSECURED);
}
-
mOutboundLinger = linger;
mOutboundClosing = true;
-
if (linger == false) {
if (mEncodeQueue.size() != 0) {
next.exceptionCaught(mSession, new WriteRejectedException(new \
ArrayList<>(mEncodeQueue), "closing")); mEncodeQueue.clear();
}
-
mEngine.closeOutbound();
-
if (ENABLE_SOFT_CLOSURE) {
write_handshake(next);
}
} else {
- flush(next);
+ flush_start(next);
}
}
@@ -713,12 +777,7 @@ import org.apache.mina.core.write.WriteRequest;
*/
protected void schedule_task(NextFilter next) {
if (ENABLE_ASYNC_TASKS && (mExecutor != null)) {
- mExecutor.execute(new Runnable() {
- @Override
- public void run() {
- SSLHandlerG0.this.execute_task(next);
- }
- });
+ mExecutor.execute(() -> SSLHandlerG1.this.execute_task(next));
} else {
execute_task(next);
}
@@ -732,30 +791,25 @@ import org.apache.mina.core.write.WriteRequest;
* @param next The next filer in the chain
*/
synchronized protected void execute_task(NextFilter next) {
- Runnable task = null;
+ Runnable task;
while ((task = mEngine.getDelegatedTask()) != null) {
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} task() - executing {}", toString(), task);
}
-
task.run();
-
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} task() - writing handshake messages", \
toString()); }
-
write_handshake(next);
} catch (SSLException e) {
store_pending_error(e);
-
try {
throw_pending_error(next);
} catch ( SSLException ssle) {
// ...
}
-
if (LOGGER.isErrorEnabled()) {
LOGGER.error("{} task() - storing error {}", toString(), e);
}
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java \
b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java index \
418be5ab2..90329f219 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
@@ -60,7 +60,7 @@ public class SslFilter extends IoFilterAdapter {
/**
* Returns the SSL2Handler object
*/
- static protected final AttributeKey SSL_HANDLER = new \
AttributeKey(SslFilter.class, "handler"); + static protected final AttributeKey \
SSL_HANDLER = new AttributeKey(SslHandler.class, "handler");
/**
* The logger
@@ -74,13 +74,18 @@ public class SslFilter extends IoFilterAdapter {
new LinkedBlockingDeque<>(), new BasicThreadFactory("ssl-exec", true));
protected final SSLContext sslContext;
-
+
/** A flag used to tell the filter to start the handshake immediately (in \
onPostAdd method)
* alternatively handshake will be started after session is connected (in \
sessionOpened method)
* default value is true
**/
private final boolean autoStart;
+ /**
+ * Enables the non-blocking IO
+ */
+ private boolean nonBlock = true;
+
/** A flag set if client authentication is required */
protected boolean needClientAuth = false;
@@ -135,6 +140,10 @@ public class SslFilter extends IoFilterAdapter {
this.autoStart = autoStart;
}
+ public void setNonBlocking(boolean enable) {
+ this.nonBlock = enable;
+ }
+
/**
* @return <code>true</code> if the engine will <em>require</em> client
* authentication. This option is only useful to engines in the server
@@ -299,7 +308,11 @@ public class SslFilter extends IoFilterAdapter {
if (sslHandler == null) {
InetSocketAddress s = \
InetSocketAddress.class.cast(session.getRemoteAddress()); SSLEngine sslEngine = \
createEngine(session, s);
- sslHandler = new SSLHandlerG0(sslEngine, EXECUTOR, session);
+ if(this.nonBlock){
+ sslHandler = new SSLHandlerG1(sslEngine, EXECUTOR, session);
+ }else {
+ sslHandler = new SSLHandlerG0(sslEngine, EXECUTOR, session);
+ }
session.setAttribute(SSL_HANDLER, sslHandler);
}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic