[prev in list] [next in list] [prev in thread] [next in thread] 

List:       mina-commits
Subject:    (mina) 01/01: working on the multi-phase synchronization; it was working before but is currently bro
From:       johnnyv () apache ! org
Date:       2024-02-20 2:34:21
Message-ID: 20240220023420.74D274407BC () gitbox2-he-fi ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

johnnyv pushed a commit to branch bugfix/DIRMINA-1173
in repository https://gitbox.apache.org/repos/asf/mina.git

commit 0cbbc8b2ef2a10d6c0ce9f2101fb0ed1b51466a8
Author: Jonathan Valliere <jon.valliere@emoten.com>
AuthorDate: Mon Feb 19 21:19:21 2024 -0500

    working on the multi-phase synchronization; it was working before but is \
                currently broken
---
 .../org/apache/mina/filter/ssl/SSLHandlerG0.java   |   2 +-
 .../ssl/{SSLHandlerG0.java => SSLHandlerG1.java}   | 288 ++++++++++++---------
 .../java/org/apache/mina/filter/ssl/SslFilter.java |  19 +-
 3 files changed, 188 insertions(+), 121 deletions(-)

diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java \
b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java index \
                3109426a2..72f3e2bdb 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
@@ -136,7 +136,7 @@ import org.apache.mina.core.write.WriteRequest;
             
             if (mEngine.getUseClientMode()) {
                 if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("{} open() - begin handshaking", toString());
+                    LOGGER.debug("{} open() - begin handshaking", this);
                 }
                 
                 mEngine.beginHandshake();
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java \
b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java similarity \
index 78% copy from mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
 copy to mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java
index 3109426a2..5cb6858ea 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java
@@ -19,20 +19,21 @@
  */
 package org.apache.mina.filter.ssl;
 
-import java.nio.BufferOverflowException;
-import java.util.ArrayList;
-import java.util.concurrent.Executor;
-
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLException;
-
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.filterchain.IoFilter.NextFilter;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.core.write.WriteRejectedException;
 import org.apache.mina.core.write.WriteRequest;
 
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import java.nio.BufferOverflowException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executor;
+
 /**
  * Default implementation of SSLHandler
  * <p>
@@ -42,7 +43,7 @@ import org.apache.mina.core.write.WriteRequest;
  * @author Jonathan Valliere
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
-/* package protected */ class SSLHandlerG0 extends SslHandler {
+/* package protected */ class SSLHandlerG1 extends SslHandler {
 
     /**
      * Maximum number of queued messages waiting for encoding
@@ -67,7 +68,7 @@ import org.apache.mina.core.write.WriteRequest;
     /**
      * Enable asynchronous tasks
      */
-    static protected final boolean ENABLE_ASYNC_TASKS = true;
+    static protected final boolean ENABLE_ASYNC_TASKS = false;
 
     /**
      * Indicates whether the first handshake was completed
@@ -90,9 +91,19 @@ import org.apache.mina.core.write.WriteRequest;
     protected boolean mOutboundLinger = false;
 
     /**
-     * Holds the decoder thread reference; used for recursion detection
+     * Holds the decoder thread reference; used for recursion detection introduced \
by a delegated task +     */
+    protected volatile Thread mReceiveThread = null;
+
+    /**
+     * Encoded buffers ready for processing upstream
+     */
+    protected final Deque<EncryptedWriteRequest> mWriteQueue = new \
ConcurrentLinkedDeque<>(); +
+    /**
+     * Decoded buffers ready for processing downstream
      */
-    protected Thread mDecodeThread = null;
+    protected final Deque<IoBuffer> mReceiveQueue = new ConcurrentLinkedDeque<>();
 
     /**
      * Captured error state
@@ -101,12 +112,12 @@ import org.apache.mina.core.write.WriteRequest;
 
     /**
      * Instantiates a new handler
-     * 
+     *
      * @param sslEngine The SSLEngine instance
      * @param executor The executor instance to use to process tasks
      * @param session The session to handle
      */
-    public SSLHandlerG0(SSLEngine sslEngine, Executor executor, IoSession session) {
+    public SSLHandlerG1(SSLEngine sslEngine, Executor executor, IoSession session) {
         super(sslEngine, executor, session);
     }
 
@@ -130,49 +141,72 @@ import org.apache.mina.core.write.WriteRequest;
      * {@inheritDoc}
      */
     @Override
-    synchronized public void open(NextFilter next) throws SSLException {
-        if (mHandshakeStarted == false) {
-            mHandshakeStarted = true;
-            
-            if (mEngine.getUseClientMode()) {
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("{} open() - begin handshaking", toString());
+    public void open(NextFilter next) throws SSLException {
+        synchronized (this) {
+            if (mHandshakeStarted == false) {
+                mHandshakeStarted = true;
+                if (mEngine.getUseClientMode()) {
+                    if (LOGGER.isDebugEnabled()) {
+                        LOGGER.debug("{} open() - begin handshaking", this);
+                    }
+                    mEngine.beginHandshake();
+                    write_handshake(next);
                 }
-                
-                mEngine.beginHandshake();
-                write_handshake(next);
             }
         }
+        synchronized (mWriteQueue) {
+            EncryptedWriteRequest x;
+            while((x = mWriteQueue.poll()) != null) {
+                next.filterWrite(mSession, x);
+            }
+        }
+        synchronized (this) {
+            throw_pending_error(next);
+        }
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    synchronized public void receive(NextFilter next, IoBuffer message) throws \
                SSLException {
-        if (mDecodeThread == null) {
+    public void receive(NextFilter next, IoBuffer message) throws SSLException {
+        receive_start(next, message);
+        synchronized (mReceiveQueue) {
+            IoBuffer x;
+            while((x = mReceiveQueue.poll()) != null) {
+                next.messageReceived(mSession, x);
+            }
+        }
+        synchronized (mWriteQueue) {
+            EncryptedWriteRequest x;
+            while((x = mWriteQueue.poll()) != null) {
+                next.filterWrite(mSession, x);
+            }
+        }
+        synchronized (this) {
+            throw_pending_error(next);
+        }
+    }
+
+    synchronized protected void receive_start(NextFilter next, IoBuffer message) \
throws SSLException { +        if(mReceiveThread == Thread.currentThread()) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("{} receive() - recursion", toString());
+            }
+            receive_loop(next, mDecodeBuffer);
+        } else {
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("{} receive() - message {}", toString(), message);
             }
-            
-            mDecodeThread = Thread.currentThread();
+            mReceiveThread = Thread.currentThread();
             IoBuffer source = resume_decode_buffer(message);
-            
             try {
                 receive_loop(next, source);
             } finally {
                 suspend_decode_buffer(source);
-                mDecodeThread = null;
+                mReceiveThread = null;
             }
-        } else {
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("{} receive() - recursion", toString());
-            }
-            
-            receive_loop(next, mDecodeBuffer);
         }
-
-        throw_pending_error(next);
     }
 
     /**
@@ -232,8 +266,8 @@ import org.apache.mina.core.write.WriteRequest;
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("{} receive_loop() - result {}", toString(), dest);
             }
-            
-            next.messageReceived(mSession, dest);
+
+            mReceiveQueue.push(dest);
         }
 
         switch (result.getHandshakeStatus()) {
@@ -288,61 +322,72 @@ import org.apache.mina.core.write.WriteRequest;
      * {@inheritDoc}
      */
     @Override
-    synchronized public void ack(NextFilter next, WriteRequest request) throws \
                SSLException {
-        if (mAckQueue.remove(request)) {
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("{} ack() - {}", toString(), request);
+    public void ack(NextFilter next, WriteRequest request) throws SSLException {
+        synchronized (this) {
+            if (mAckQueue.remove(request)) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("{} ack() - {}", toString(), request);
+                }
+
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("{} ack() - checking to see if any messages can be \
flushed", toString(), request); +                }
+                flush_start(next);
             }
-            
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("{} ack() - checking to see if any messages can be \
flushed", toString(), request); +        }
+        synchronized (mWriteQueue) {
+            EncryptedWriteRequest x;
+            while((x = mWriteQueue.poll()) != null) {
+                next.filterWrite(mSession, x);
             }
-            
-            flush(next);
         }
-
-        throw_pending_error(next);
+        synchronized (this) {
+            throw_pending_error(next);
+        }
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    synchronized public void write(NextFilter next, WriteRequest request) throws \
                SSLException, WriteRejectedException {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("{} write() - source {}", toString(), request);
-        }
-
-        if (mOutboundClosing) {
-            throw new WriteRejectedException(request, "closing");
-        }
-
-        if (mEncodeQueue.isEmpty()) {
-            if (write_user_loop(next, request) == false) {
+    public void write(NextFilter next, WriteRequest request) throws SSLException, \
WriteRejectedException { +        synchronized (this) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("{} write() - source {}", toString(), request);
+            }
+            if (mOutboundClosing) {
+                throw new WriteRejectedException(request, "closing");
+            }
+            if (mEncodeQueue.isEmpty()) {
+                if (write_loop(next, request) == false) {
+                    if (LOGGER.isDebugEnabled()) {
+                        LOGGER.debug("{} write() - unable to write right now, saving \
request for later", toString(), +                                request);
+                    }
+                    if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) {
+                        throw new BufferOverflowException();
+                    }
+                    mEncodeQueue.add(request);
+                }
+            } else {
                 if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("{} write() - unable to write right now, saving \
                request for later", toString(),
-                            request);
+                    LOGGER.debug("{} write() - unable to write right now, saving \
request for later", toString(), request);  }
-                
                 if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) {
                     throw new BufferOverflowException();
                 }
-                
                 mEncodeQueue.add(request);
             }
-        } else {
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("{} write() - unable to write right now, saving request \
                for later", toString(), request);
-            }
-            
-            if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) {
-                throw new BufferOverflowException();
+        }
+        synchronized (mWriteQueue) {
+            EncryptedWriteRequest x;
+            while((x = mWriteQueue.poll()) != null) {
+                next.filterWrite(mSession, x);
             }
-            
-            mEncodeQueue.add(request);
         }
-
-        throw_pending_error(next);
+        synchronized (this) {
+            throw_pending_error(next);
+        }
     }
 
     /**
@@ -357,7 +402,7 @@ import org.apache.mina.core.write.WriteRequest;
      * @throws SSLException
      */
     @SuppressWarnings("incomplete-switch")
-    synchronized protected boolean write_user_loop(NextFilter next, WriteRequest \
request) throws SSLException { +    synchronized protected boolean \
write_loop(NextFilter next, WriteRequest request) throws SSLException {  if \
                (LOGGER.isDebugEnabled()) {
             LOGGER.debug("{} write_user_loop() - source {}", toString(), request);
         }
@@ -383,8 +428,8 @@ import org.apache.mina.core.write.WriteRequest;
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("{} write_user_loop() - result {}", toString(), \
encrypted);  }
-                
-                next.filterWrite(mSession, encrypted);
+
+                mWriteQueue.push(encrypted);
                 // do not return because we want to enter the handshake switch
             } else {
                 // then we probably consumed some data
@@ -397,11 +442,11 @@ import org.apache.mina.core.write.WriteRequest;
                     if (LOGGER.isDebugEnabled()) {
                         LOGGER.debug("{} write_user_loop() - result {}", toString(), \
encrypted);  }
-                    
-                    next.filterWrite(mSession, encrypted);
+
+                    mWriteQueue.push(encrypted);
                     
                     if (mAckQueue.size() < MAX_UNACK_MESSAGES) {
-                        return write_user_loop(next, request); // write additional \
chunks +                        return write_loop(next, request); // write additional \
chunks  }
                     
                     return false;
@@ -412,8 +457,8 @@ import org.apache.mina.core.write.WriteRequest;
                     if (LOGGER.isDebugEnabled()) {
                         LOGGER.debug("{} write_user_loop() - result {}", toString(), \
encrypted);  }
-                    
-                    next.filterWrite(mSession, encrypted);
+
+                    mWriteQueue.push(encrypted);
                     
                     return true;
                 }
@@ -435,7 +480,7 @@ import org.apache.mina.core.write.WriteRequest;
                     LOGGER.debug("{} write_user_loop() - handshake needs wrap, \
looping", toString());  }
                 
-                return write_user_loop(next, request);
+                return write_loop(next, request);
                 
             case FINISHED:
                 if (LOGGER.isDebugEnabled()) {
@@ -444,7 +489,7 @@ import org.apache.mina.core.write.WriteRequest;
                 
                 finish_handshake(next);
                 
-                return write_user_loop(next, request);
+                return write_loop(next, request);
         }
 
         return false;
@@ -536,7 +581,7 @@ import org.apache.mina.core.write.WriteRequest;
             }
             
             EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null);
-            next.filterWrite(mSession, encrypted);
+            mWriteQueue.push(encrypted);
         }
 
         switch (result.getHandshakeStatus()) {
@@ -544,15 +589,13 @@ import org.apache.mina.core.write.WriteRequest;
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("{} write_handshake_loop() - handshake needs \
unwrap, invoking receive", toString());  }
-                
-                receive(next, ZERO);
+                receive_start(next, ZERO);
                 break;
                 
             case NEED_WRAP:
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("{} write_handshake_loop() - handshake needs wrap, \
looping", toString());  }
-                
                 write_handshake(next);
                 break;
                 
@@ -560,7 +603,6 @@ import org.apache.mina.core.write.WriteRequest;
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("{} write_handshake_loop() - handshake needs task, \
scheduling", toString());  }
-                
                 schedule_task(next);
                 break;
                 
@@ -568,7 +610,6 @@ import org.apache.mina.core.write.WriteRequest;
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("{} write_handshake_loop() - handshake finished, \
flushing queue", toString());  }
-                
                 finish_handshake(next);
                 break;
         }
@@ -592,8 +633,24 @@ import org.apache.mina.core.write.WriteRequest;
         /**
          * There exists a bug in the JDK which emits FINISHED twice instead of once.
          */
-        receive(next, ZERO);
-        flush(next);
+        receive_start(next, ZERO);
+        flush_start(next);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void flush(NextFilter next) throws SSLException {
+        flush_start(next);
+        synchronized (mWriteQueue) {
+            EncryptedWriteRequest x;
+            while((x = mWriteQueue.poll()) != null) {
+                next.filterWrite(mSession, x);
+            }
+        }
+        synchronized (this) {
+            throw_pending_error(next);
+        }
     }
 
     /**
@@ -603,7 +660,7 @@ import org.apache.mina.core.write.WriteRequest;
      * 
      * @throws SSLException
      */
-    synchronized public void flush(NextFilter next) throws SSLException {
+    synchronized protected void flush_start(NextFilter next) throws SSLException {
         if (mOutboundClosing && mOutboundLinger == false) {
             return;
         }
@@ -623,7 +680,7 @@ import org.apache.mina.core.write.WriteRequest;
                 LOGGER.debug("{} flush() - {}", toString(), current);
             }
             
-            if (write_user_loop(next, current) == false) {
+            if (write_loop(next, current) == false) {
                 mEncodeQueue.addFirst(current);
                 
                 break;
@@ -643,35 +700,42 @@ import org.apache.mina.core.write.WriteRequest;
      * {@inheritDoc}
      */
     @Override
-    synchronized public void close(NextFilter next, boolean linger) throws \
SSLException { +    public void close(NextFilter next, boolean linger) throws \
SSLException { +        close_start(next, linger);
+        synchronized (mWriteQueue) {
+            EncryptedWriteRequest x;
+            while((x = mWriteQueue.poll()) != null) {
+                next.filterWrite(mSession, x);
+            }
+        }
+        synchronized (this) {
+            throw_pending_error(next);
+        }
+    }
+
+    synchronized protected void close_start(NextFilter next, boolean linger) throws \
SSLException {  if (mOutboundClosing) {
             return;
         }
-
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("{} close() - closing session", toString());
         }
-
         if (mHandshakeComplete) {
             next.event(mSession, SslEvent.UNSECURED);
         }
-
         mOutboundLinger = linger;
         mOutboundClosing = true;
-
         if (linger == false) {
             if (mEncodeQueue.size() != 0) {
                 next.exceptionCaught(mSession, new WriteRejectedException(new \
ArrayList<>(mEncodeQueue), "closing"));  mEncodeQueue.clear();
             }
-            
             mEngine.closeOutbound();
-            
             if (ENABLE_SOFT_CLOSURE) {
                 write_handshake(next);
             }
         } else {
-            flush(next);
+            flush_start(next);
         }
     }
 
@@ -713,12 +777,7 @@ import org.apache.mina.core.write.WriteRequest;
      */
     protected void schedule_task(NextFilter next) {
         if (ENABLE_ASYNC_TASKS && (mExecutor != null)) {
-            mExecutor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    SSLHandlerG0.this.execute_task(next);
-                }
-            });
+            mExecutor.execute(() -> SSLHandlerG1.this.execute_task(next));
         } else {
             execute_task(next);
         }
@@ -732,30 +791,25 @@ import org.apache.mina.core.write.WriteRequest;
      * @param next The next filer in the chain
      */
     synchronized protected void execute_task(NextFilter next) {
-        Runnable task = null;
+        Runnable task;
         
         while ((task = mEngine.getDelegatedTask()) != null) {
             try {
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("{} task() - executing {}", toString(), task);
                 }
-
                 task.run();
-
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("{} task() - writing handshake messages", \
toString());  }
-
                 write_handshake(next);
             } catch (SSLException e) {
                 store_pending_error(e);
-                
                 try { 
                     throw_pending_error(next);
                 } catch ( SSLException ssle) {
                     // ...
                 }
-                
                 if (LOGGER.isErrorEnabled()) {
                     LOGGER.error("{} task() - storing error {}", toString(), e);
                 }
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java \
b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java index \
                418be5ab2..90329f219 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
@@ -60,7 +60,7 @@ public class SslFilter extends IoFilterAdapter {
     /**
      * Returns the SSL2Handler object
      */
-    static protected final AttributeKey SSL_HANDLER = new \
AttributeKey(SslFilter.class, "handler"); +    static protected final AttributeKey \
SSL_HANDLER = new AttributeKey(SslHandler.class, "handler");  
     /**
      * The logger
@@ -74,13 +74,18 @@ public class SslFilter extends IoFilterAdapter {
             new LinkedBlockingDeque<>(), new BasicThreadFactory("ssl-exec", true));
 
     protected final SSLContext sslContext;
-    
+
     /** A flag used to tell the filter to start the handshake immediately (in \
                onPostAdd method)
      *  alternatively handshake will be started after session is connected (in \
                sessionOpened method)
      *  default value is true
      **/
     private final boolean autoStart;
 
+    /**
+     * Enables the non-blocking IO
+     */
+    private boolean nonBlock = true;
+
     /** A flag set if client authentication is required */ 
     protected boolean needClientAuth = false;
 
@@ -135,6 +140,10 @@ public class SslFilter extends IoFilterAdapter {
         this.autoStart = autoStart;
     }
 
+    public void setNonBlocking(boolean enable) {
+        this.nonBlock = enable;
+    }
+
     /**
      * @return <code>true</code> if the engine will <em>require</em> client
      *         authentication. This option is only useful to engines in the server
@@ -299,7 +308,11 @@ public class SslFilter extends IoFilterAdapter {
         if (sslHandler == null) {
             InetSocketAddress s = \
InetSocketAddress.class.cast(session.getRemoteAddress());  SSLEngine sslEngine = \
                createEngine(session, s);
-            sslHandler = new SSLHandlerG0(sslEngine, EXECUTOR, session);
+            if(this.nonBlock){
+                sslHandler = new SSLHandlerG1(sslEngine, EXECUTOR, session);
+            }else {
+                sslHandler = new SSLHandlerG0(sslEngine, EXECUTOR, session);
+            }
             session.setAttribute(SSL_HANDLER, sslHandler);
         }
 


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic