[prev in list] [next in list] [prev in thread] [next in thread] 

List:       pykde
Subject:    Re: [PyQt] PyQt5.3 invokeMethod with Qt.BlockingQueuedConnection and return value question
From:       Alan Ezust <alan.ezust () gmail ! com>
Date:       2014-12-14 17:50:01
Message-ID: CALy5K9o4M+Qi8AT02L==eZ7qjwa-+SdNq1C1xE3G8BaPF-8Dgg () mail ! gmail ! com
[Download RAW message or body]

[Attachment #2 (multipart/alternative)]


Thank you again, Phil!
I am attaching an updated version of my test case now that it works,
and has better docs and cleanup code. Hopefully this will  help others
who google for the things that are in my subject line.

​

[Attachment #5 (text/html)]

<div dir="ltr"><div>Thank you again, Phil! <br></div>I am attaching an updated version of my \
test case now that it works,<br>and has better docs and cleanup code. Hopefully this will   \
help others<br>who google for the things that are in my subject line.<br><br>​</div>


["testcase.py" (text/plain)]

from nose.tools import assert_equals
from nose.tools import assert_equal, assert_in, assert_raises
import sys
import time
from PyQt5.Qt import QObject, QApplication, pyqtSlot, QThread, QMetaObject, Qt, Q_ARG, \
Q_RETURN_ARG, qApp


class TestBlockingQueuedConnection:
    """
        Test to see if we can get return values from a blocking queued connection
        invoking a method in another thread. 
    """

    class SampleQObject(QObject):
        def __init__(self, name = "", parent=None):
            super().__init__(parent)
            self.setObjectName(name)
            self.counter = 0

        @pyqtSlot(str, result=int)
        def sample_slot_with_args(self, string: str):
            self.counter += 1
            print ("slot with args called: " + string)
            if self.counter > 5:
                self.thread().quit()
            return self.counter

        def __del__(self):
            print ("object cleaned up properly.")
            assert(True)

    @classmethod
    def setup_class(cls):
        """Called once, when this Test Case instantiated"""
        pass
        
    def setup(self):
        """Called before every test method: """
        pass

    def teardown(self):
        """Called after every test method: """
        pass

    def test_invoke_method(self):
        """ Test to show how QMetaObject.invokeMethod() works for communicating with an object \
in another thread.  """  self.app = QApplication.instance()
        if self.app is None:
            self.app = QApplication(sys.argv)
        self.other_thread_object = self.SampleQObject("b")

        # Set the parent of the QThread to ensure the QThread is cleaned up when the \
QApplication is:  self.thread = QThread(self.app)
        self.other_thread_object.moveToThread(self.thread)
        self.thread.start()

        # Clean up the heap object when the thread terminates.
        self.thread.finished.connect(self.other_thread_object.deleteLater)

        # Quit the app after that:
        self.thread.finished.connect(self.app.quit)

        # First get the metaobject, you can save this for multiple invocations.
        self.meta = self.other_thread_object.metaObject()
        # invokeMethod runs its slot in the other thread.
        for c in range(0, 6):
            time.sleep(0.5)
            return_val = self.meta.invokeMethod(self.other_thread_object, \
                "sample_slot_with_args",
                            Qt.BlockingQueuedConnection, Q_RETURN_ARG(int), Q_ARG(str, "invoke# \
" + str(c)))  print (str(return_val))

        self.app.exec()

    @classmethod
    def teardown_class(cls):
        """Called once, when this Test Case has completed"""
        pass
        


[Attachment #7 (text/plain)]

_______________________________________________
PyQt mailing list    PyQt@riverbankcomputing.com
http://www.riverbankcomputing.com/mailman/listinfo/pyqt

[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic