[prev in list] [next in list] [prev in thread] [next in thread] 

List:       kde-commits
Subject:    [kdelibs/frameworks] tier1/threadweaver: Ensure neither queue nor thread keep a reference to a previ
From:       Mirko Boehm (Endocode) <mirko () endocode ! com>
Date:       2013-10-31 18:50:28
Message-ID: E1VbxK8-0000SU-Ly () scm ! kde ! org
[Download RAW message or body]

Git commit 3d5e9cdf1f349e04a3d5384700b7abb9aeb041f3 by Mirko Boehm (Endocode).
Committed on 31/10/2013 at 18:46.
Pushed by mirko into branch 'frameworks'.

Ensure neither queue nor thread keep a reference to a previous job.

See comments on JobTests::JobsAreDestroyedAfterFinish:

Verify that neither the queue nor the thread keep a reference to the job
after completing it.

This is necessary because user-allocated objects like queue policies may
be registered with the jobs. If the jobs stick around until the thread
or queue are deleted, the user-allocated objects may have gone out of
scope or been deleted already, causing potential errors. From
ThreadWeaver's point of view, a job seizes to exist once the processing
thread asks for the next job.

M  +15   -4    tier1/threadweaver/autotests/JobTests.cpp
M  +3    -3    tier1/threadweaver/src/Weaver/DestructedState.cpp
M  +1    -1    tier1/threadweaver/src/Weaver/DestructedState.h
M  +3    -3    tier1/threadweaver/src/Weaver/InConstructionState.cpp
M  +1    -1    tier1/threadweaver/src/Weaver/InConstructionState.h
M  +2    -2    tier1/threadweaver/src/Weaver/QueueInterface.h
M  +3    -3    tier1/threadweaver/src/Weaver/ShuttingDownState.cpp
M  +1    -1    tier1/threadweaver/src/Weaver/ShuttingDownState.h
M  +3    -3    tier1/threadweaver/src/Weaver/SuspendedState.cpp
M  +1    -1    tier1/threadweaver/src/Weaver/SuspendedState.h
M  +3    -3    tier1/threadweaver/src/Weaver/SuspendingState.cpp
M  +1    -1    tier1/threadweaver/src/Weaver/SuspendingState.h
M  +15   -17   tier1/threadweaver/src/Weaver/Thread.cpp
M  +7    -7    tier1/threadweaver/src/Weaver/WeaverImpl.cpp
M  +2    -2    tier1/threadweaver/src/Weaver/WeaverImpl.h
M  +5    -5    tier1/threadweaver/src/Weaver/WorkingHardState.cpp
M  +1    -1    tier1/threadweaver/src/Weaver/WorkingHardState.h

http://commits.kde.org/kdelibs/3d5e9cdf1f349e04a3d5384700b7abb9aeb041f3

diff --git a/tier1/threadweaver/autotests/JobTests.cpp \
b/tier1/threadweaver/autotests/JobTests.cpp index 6dd1667..9b23d9b 100644
--- a/tier1/threadweaver/autotests/JobTests.cpp
+++ b/tier1/threadweaver/autotests/JobTests.cpp
@@ -867,22 +867,33 @@ struct InstanceCountedJob : public Job {
     }
 
     ~InstanceCountedJob() {
-        counter.fetchAndAddRelease(0);
+        counter.fetchAndAddRelease(-1);
     }
 };
 
 QAtomicInt InstanceCountedJob::counter;
 
+/** @brief Verify that neither the queue nor the thread keep a reference to the job \
after completing it. + *
+ * This is necessary because user-allocated objects like queue policies may be \
registered with the jobs. If the jobs stick around + * until the thread or queue are \
deleted, the user-allocatd objects may have gone out of scope or been deleted \
already, causing + * potential errors. From ThreadWeaver's point of view, a job \
seizes to exist once the processing thread asks for the next job. */  void \
JobTests::JobsAreDestroyedAfterFinish()  {
     using namespace ThreadWeaver;
-
-    WaitForIdleAndFinished w(Weaver::instance());
+    WaitForIdleAndFinished w(Weaver::instance()); Q_UNUSED(w);
     Weaver::instance()->suspend();
-    Weaver::instance()->enqueue(JobPointer(new InstanceCountedJob));
+    JobPointer job(new InstanceCountedJob);
+    Weaver::instance()->enqueue(job);
     QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 1);
     Weaver::instance()->resume();
+    QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 1);
     Weaver::instance()->finish();
+    QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 1);
+    QCoreApplication::processEvents();
+    QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 1);
+    job.clear();
+    // if this succeeds, job is the only shared pointer pointing to the created \
InstanceCountedJob object:  QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 0);
 }
 
diff --git a/tier1/threadweaver/src/Weaver/DestructedState.cpp \
b/tier1/threadweaver/src/Weaver/DestructedState.cpp index 5b6fc87..4b81fa7 100644
--- a/tier1/threadweaver/src/Weaver/DestructedState.cpp
+++ b/tier1/threadweaver/src/Weaver/DestructedState.cpp
@@ -111,10 +111,10 @@ void DestructedState::resume()
 {
 }
 
-JobPointer DestructedState::applyForWork(Thread*, JobPointer previous)
+JobPointer DestructedState::applyForWork(Thread*, bool wasBusy)
 {
-    Q_UNUSED(previous) // except in Q_ASSERT
-    Q_ASSERT(previous==0);
+    Q_UNUSED(wasBusy) // except in Q_ASSERT
+    Q_ASSERT(wasBusy==false);
     return JobPointer();
 }
 
diff --git a/tier1/threadweaver/src/Weaver/DestructedState.h \
b/tier1/threadweaver/src/Weaver/DestructedState.h index b222c1f..906f873 100644
--- a/tier1/threadweaver/src/Weaver/DestructedState.h
+++ b/tier1/threadweaver/src/Weaver/DestructedState.h
@@ -63,7 +63,7 @@ public:
     void requestAbort() Q_DECL_OVERRIDE;
     void suspend() Q_DECL_OVERRIDE;
     void resume() Q_DECL_OVERRIDE;
-    JobPointer applyForWork(Thread *th, JobPointer previous) Q_DECL_OVERRIDE;
+    JobPointer applyForWork(Thread *th, bool wasBusy) Q_DECL_OVERRIDE;
     void waitForAvailableJob ( Thread *th ) Q_DECL_OVERRIDE;
     StateId stateId() const Q_DECL_OVERRIDE;
 };
diff --git a/tier1/threadweaver/src/Weaver/InConstructionState.cpp \
b/tier1/threadweaver/src/Weaver/InConstructionState.cpp index 863612f..6605fd2 100644
--- a/tier1/threadweaver/src/Weaver/InConstructionState.cpp
+++ b/tier1/threadweaver/src/Weaver/InConstructionState.cpp
@@ -48,9 +48,9 @@ void InConstructionState::resume()
     // this request is not handled in InConstruction state
 }
 
-JobPointer InConstructionState::applyForWork(Thread *th, JobPointer previous)
+JobPointer InConstructionState::applyForWork(Thread *th, bool wasBusy)
 {
-    Q_ASSERT(previous==0);
+    Q_ASSERT(wasBusy==false);
     // As long as we are in the construction state, no jobs will be given
     // to the worker threads. The threads will be suspended. They will
     // return from the blocked state when jobs are queued. By then, we
@@ -59,7 +59,7 @@ JobPointer InConstructionState::applyForWork(Thread *th, JobPointer \
previous)  while (weaver()->state()->stateId() == InConstruction) {
         weaver()->waitForAvailableJob(th);
     }
-    return weaver()->applyForWork(th, previous);
+    return weaver()->applyForWork(th, wasBusy);
 }
 
 StateId InConstructionState::stateId() const
diff --git a/tier1/threadweaver/src/Weaver/InConstructionState.h \
b/tier1/threadweaver/src/Weaver/InConstructionState.h index b96c9a9..5f5aaad 100644
--- a/tier1/threadweaver/src/Weaver/InConstructionState.h
+++ b/tier1/threadweaver/src/Weaver/InConstructionState.h
@@ -52,7 +52,7 @@ public:
     /** Resume job processing. */
     void resume() Q_DECL_OVERRIDE;
     /** Assign a job to an idle thread. */
-    JobPointer applyForWork(Thread *th, JobPointer previous) Q_DECL_OVERRIDE;
+    JobPointer applyForWork(Thread *th, bool wasBusy) Q_DECL_OVERRIDE;
     /** reimpl */
     StateId stateId() const Q_DECL_OVERRIDE;
 };
diff --git a/tier1/threadweaver/src/Weaver/QueueInterface.h \
b/tier1/threadweaver/src/Weaver/QueueInterface.h index ecfa477..0ae98b3 100644
--- a/tier1/threadweaver/src/Weaver/QueueInterface.h
+++ b/tier1/threadweaver/src/Weaver/QueueInterface.h
@@ -14,9 +14,9 @@ public:
 
     /** Assign a job to an idle thread.
      * @param th the thread to give a new Job to
-     * @param previous the job this thread finished before calling
+     * @param wasBusy true if a job was previously assigned to the calling thread
      */
-    virtual JobPointer applyForWork(Thread *th, JobPointer previous) = 0;
+    virtual JobPointer applyForWork(Thread *th, bool wasBusy) = 0;
 
     /** Wait (by suspending the calling thread) until a job becomes available. */
     virtual void waitForAvailableJob(Thread *th) = 0;
diff --git a/tier1/threadweaver/src/Weaver/ShuttingDownState.cpp \
b/tier1/threadweaver/src/Weaver/ShuttingDownState.cpp index e690912..e1590b2 100644
--- a/tier1/threadweaver/src/Weaver/ShuttingDownState.cpp
+++ b/tier1/threadweaver/src/Weaver/ShuttingDownState.cpp
@@ -49,10 +49,10 @@ void ShuttingDownState::resume()
     // ignored: when shutting down, we do not return from the suspended state
 }
 
-JobPointer ShuttingDownState::applyForWork(Thread*, JobPointer previous)
+JobPointer ShuttingDownState::applyForWork(Thread*, bool wasBusy)
 {
-    Q_UNUSED(previous) // except in Q_ASSERT
-    Q_ASSERT(previous==0);
+    Q_UNUSED(wasBusy) // except in Q_ASSERT
+    Q_ASSERT(wasBusy==false);
     return JobPointer();  // tell threads to exit
 }
 
diff --git a/tier1/threadweaver/src/Weaver/ShuttingDownState.h \
b/tier1/threadweaver/src/Weaver/ShuttingDownState.h index a6429c0..d929381 100644
--- a/tier1/threadweaver/src/Weaver/ShuttingDownState.h
+++ b/tier1/threadweaver/src/Weaver/ShuttingDownState.h
@@ -54,7 +54,7 @@ public:
     /** Resume job processing. */
     void resume() Q_DECL_OVERRIDE;
     /** Assign a job to an idle thread. */
-    JobPointer applyForWork(Thread *th, JobPointer previous) Q_DECL_OVERRIDE;
+    JobPointer applyForWork(Thread *th, bool wasBusy) Q_DECL_OVERRIDE;
     /** Wait (by suspending the calling thread) until a job becomes available. */
     void waitForAvailableJob(Thread *th) Q_DECL_OVERRIDE;
     /** reimpl */
diff --git a/tier1/threadweaver/src/Weaver/SuspendedState.cpp \
b/tier1/threadweaver/src/Weaver/SuspendedState.cpp index 342d26e..106a593 100644
--- a/tier1/threadweaver/src/Weaver/SuspendedState.cpp
+++ b/tier1/threadweaver/src/Weaver/SuspendedState.cpp
@@ -48,11 +48,11 @@ void SuspendedState::resume()
     weaver()->setState(WorkingHard);
 }
 
-JobPointer SuspendedState::applyForWork(Thread *th, JobPointer previous)
+JobPointer SuspendedState::applyForWork(Thread *th, bool wasBusy)
 {   // suspend all threads in case they wake up:
-    Q_ASSERT(previous==0);
+    Q_ASSERT(wasBusy==0);
     weaver()->waitForAvailableJob(th);
-    return weaver()->applyForWork(th, previous);
+    return weaver()->applyForWork(th, wasBusy);
 }
 
 StateId SuspendedState::stateId() const
diff --git a/tier1/threadweaver/src/Weaver/SuspendedState.h \
b/tier1/threadweaver/src/Weaver/SuspendedState.h index 9456a74..6743d86 100644
--- a/tier1/threadweaver/src/Weaver/SuspendedState.h
+++ b/tier1/threadweaver/src/Weaver/SuspendedState.h
@@ -49,7 +49,7 @@ public:
     /** Resume job processing. */
     void resume() Q_DECL_OVERRIDE;
     /** Assign a job to an idle thread. */
-    JobPointer applyForWork(Thread *th, JobPointer previous) Q_DECL_OVERRIDE;
+    JobPointer applyForWork(Thread *th, bool wasBusy) Q_DECL_OVERRIDE;
     /** reimpl */
     StateId stateId() const Q_DECL_OVERRIDE;
 };
diff --git a/tier1/threadweaver/src/Weaver/SuspendingState.cpp \
b/tier1/threadweaver/src/Weaver/SuspendingState.cpp index 2955852..8aec62c 100644
--- a/tier1/threadweaver/src/Weaver/SuspendingState.cpp
+++ b/tier1/threadweaver/src/Weaver/SuspendingState.cpp
@@ -54,11 +54,11 @@ void SuspendingState::activated()
     weaver()->reschedule();
 }
 
-JobPointer SuspendingState::applyForWork(Thread *th, JobPointer previous)
+JobPointer SuspendingState::applyForWork(Thread *th, bool wasBusy)
 {
-    weaver()->takeFirstAvailableJobOrSuspendOrWait(th, previous, true, true);
+    weaver()->takeFirstAvailableJobOrSuspendOrWait(th, wasBusy, true, true);
     weaver()->waitForAvailableJob(th);
-    return weaver()->applyForWork(th, JobPointer());
+    return weaver()->applyForWork(th, false);
 }
 
 StateId SuspendingState::stateId() const
diff --git a/tier1/threadweaver/src/Weaver/SuspendingState.h \
b/tier1/threadweaver/src/Weaver/SuspendingState.h index 5b205ce..7abe76d 100644
--- a/tier1/threadweaver/src/Weaver/SuspendingState.h
+++ b/tier1/threadweaver/src/Weaver/SuspendingState.h
@@ -50,7 +50,7 @@ public:
     /** Resume job processing. */
     void resume() Q_DECL_OVERRIDE;
     /** Assign a job to an idle thread. */
-    JobPointer applyForWork(Thread *th,  JobPointer previous) Q_DECL_OVERRIDE;
+    JobPointer applyForWork(Thread *th,  bool wasBusy) Q_DECL_OVERRIDE;
     /** Overload. */
     void activated() Q_DECL_OVERRIDE;
     /** reimpl */
diff --git a/tier1/threadweaver/src/Weaver/Thread.cpp \
b/tier1/threadweaver/src/Weaver/Thread.cpp index 7de6f9c..8064583 100644
--- a/tier1/threadweaver/src/Weaver/Thread.cpp
+++ b/tier1/threadweaver/src/Weaver/Thread.cpp
@@ -95,26 +95,26 @@ void Thread::run()
     emit started(this);
     debug(3, "Thread::run [%u]: running.\n", id());
 
+    bool wasBusy = false;
     while (true) {
         debug(3, "Thread::run [%u]: trying to execute the next job.\n", id());
-        JobPointer oldJob;
-        {
-            QMutexLocker l(&d->mutex); Q_UNUSED(l);
-            oldJob = d->job; d->job.clear();
-        }
-        JobPointer newJob = d->parent->applyForWork(this, oldJob);
 
-        if (newJob == 0) {
-            break;
+        // the assignment is intentional: newJob needs to go out of scope at the end \
of the if statement +        if (JobPointer newJob = d->parent->applyForWork(this, \
wasBusy)) { +            QMutexLocker l(&d->mutex); Q_UNUSED(l);
+            d->job = newJob;
         } else {
-            {
-                QMutexLocker l(&d->mutex); Q_UNUSED(l);
-                d->job = newJob;
-            }
-            emit jobStarted(newJob, this);
-            newJob->execute(newJob, this);
-            emit jobDone(newJob);
+            break;
         }
+
+        wasBusy = true;
+
+        emit jobStarted(d->job, this);
+        d->job->execute(d->job, this);
+        emit jobDone(d->job);
+
+        QMutexLocker l(&d->mutex); Q_UNUSED(l);
+        d->job.clear();
     }
     debug ( 3, "Thread::run [%u]: exiting.\n", id() );
 }
@@ -124,8 +124,6 @@ void Thread::requestAbort ()
     QMutexLocker l(&d->mutex); Q_UNUSED(l);
     if (d->job) {
         d->job->requestAbort();
-    } else {
-        qDebug ( "Thread::requestAbort: not running." );
     }
 }
 
diff --git a/tier1/threadweaver/src/Weaver/WeaverImpl.cpp \
b/tier1/threadweaver/src/Weaver/WeaverImpl.cpp index 56afea3..724d757 100644
--- a/tier1/threadweaver/src/Weaver/WeaverImpl.cpp
+++ b/tier1/threadweaver/src/Weaver/WeaverImpl.cpp
@@ -503,16 +503,16 @@ void WeaverImpl::threadEnteredRun(Thread *thread)
     emit threadStarted(thread);
 }
 
-JobPointer WeaverImpl::takeFirstAvailableJobOrSuspendOrWait(Thread *th, JobPointer \
previous, +JobPointer WeaverImpl::takeFirstAvailableJobOrSuspendOrWait(Thread *th, \
                bool threadWasBusy,
                                                             bool suspendIfInactive, \
bool justReturning)  {
     QMutexLocker l (m_mutex); Q_UNUSED(l);
-    Q_ASSERT(previous==0 || (previous != 0 && m_active > 0));
+    Q_ASSERT(threadWasBusy==false || (threadWasBusy == true && m_active > 0));
     debug(3, "WeaverImpl::takeFirstAvailableJobOrWait: trying to assign new job to \
thread %i (%s state).\n",  th->id(), qPrintable(state()->stateName()));
-    debug(5, "WeaverImpl::takeFirstAvailableJobOrWait: %i active threads, previous: \
                %p, suspend?: %s, assign new job?: %s.\n",
-          activeThreadCount(), previous.data(), suspendIfInactive ? "yes" : "no", \
                !justReturning ? "yes" : "no");
-    if (previous) {
+    debug(5, "WeaverImpl::takeFirstAvailableJobOrWait: %i active threads, was busy: \
%s, suspend: %s, assign new job: %s.\n", +          activeThreadCount(), \
threadWasBusy ? "yes" : "no", suspendIfInactive ? "yes" : "no", !justReturning ? \
"yes" : "no"); +    if (threadWasBusy) {
         // cleanup and send events:
         decActiveThreadCount();
     }
@@ -547,9 +547,9 @@ JobPointer \
WeaverImpl::takeFirstAvailableJobOrSuspendOrWait(Thread *th, JobPoint  return \
JobPointer();  }
 
-JobPointer WeaverImpl::applyForWork(Thread *th, JobPointer previous)
+JobPointer WeaverImpl::applyForWork(Thread *th, bool wasBusy)
 {
-    return state()->applyForWork(th, previous);
+    return state()->applyForWork(th, wasBusy);
 }
 
 void WeaverImpl::waitForAvailableJob(Thread* th)
diff --git a/tier1/threadweaver/src/Weaver/WeaverImpl.h \
b/tier1/threadweaver/src/Weaver/WeaverImpl.h index 7e5eab2..a86b91b 100644
--- a/tier1/threadweaver/src/Weaver/WeaverImpl.h
+++ b/tier1/threadweaver/src/Weaver/WeaverImpl.h
@@ -99,7 +99,7 @@ public:
         met.
         In *previous*, threads give the job they have completed. If this is
         the first job, previous is zero. */
-    virtual JobPointer applyForWork (Thread *thread, JobPointer previous) \
Q_DECL_OVERRIDE; +    virtual JobPointer applyForWork (Thread *thread, bool wasBusy) \
Q_DECL_OVERRIDE;  /** Wait for a job to become available. */
     void waitForAvailableJob(Thread *th) Q_DECL_OVERRIDE;
     /** Blocks the calling thread until some actor calls assignJobs. */
@@ -124,7 +124,7 @@ public:
      * available. If only jobs that depened on other, unfinished jobs are in the \
                queue, this method blocks on m_jobAvailable.
      * Go to suspended state if the active thread count is now zero and \
                suspendIfAllThreadsInactive is true.
      * If justReturning is true, do not assign a new job, just process the completed \
                previous one. */
-    JobPointer takeFirstAvailableJobOrSuspendOrWait(Thread* th, JobPointer previous,
+    JobPointer takeFirstAvailableJobOrSuspendOrWait(Thread* th, bool threadWasBusy,
                                                     bool \
suspendIfAllThreadsInactive, bool justReturning);  void requestAbort() \
Q_DECL_OVERRIDE;  
diff --git a/tier1/threadweaver/src/Weaver/WorkingHardState.cpp \
b/tier1/threadweaver/src/Weaver/WorkingHardState.cpp index e5acf6f..347238d 100644
--- a/tier1/threadweaver/src/Weaver/WorkingHardState.cpp
+++ b/tier1/threadweaver/src/Weaver/WorkingHardState.cpp
@@ -58,18 +58,18 @@ void WorkingHardState::resume()
 {
 }
 
-JobPointer WorkingHardState::applyForWork(Thread *th,  JobPointer previous)
+JobPointer WorkingHardState::applyForWork(Thread *th,  bool wasBusy)
 {   // beware: this code is executed in the applying thread!
     debug(2, "WorkingHardState::applyForWork: thread %i applies for work in %s \
                state.\n", th->id(),
-          qPrintable ( weaver()->state()->stateName() ) );
-    JobPointer next = weaver()->takeFirstAvailableJobOrSuspendOrWait(th, previous, \
false, false); +          qPrintable(weaver()->state()->stateName()));
+    JobPointer next = weaver()->takeFirstAvailableJobOrSuspendOrWait(th, wasBusy, \
false, false);  if (next) {
         return next;
     } else {
         // this is no infinite recursion: the state may have changed meanwhile, or \
                jobs may have become available:
         debug(2, "WorkingHardState::applyForWork: repeating for thread %i in %s \
                state.\n", th->id(),
-              qPrintable ( weaver()->state()->stateName() ) );
-        return weaver()->applyForWork(th, JobPointer());
+              qPrintable(weaver()->state()->stateName()));
+        return weaver()->applyForWork(th, false);
     }
 }
 
diff --git a/tier1/threadweaver/src/Weaver/WorkingHardState.h \
b/tier1/threadweaver/src/Weaver/WorkingHardState.h index a222832..74bb434 100644
--- a/tier1/threadweaver/src/Weaver/WorkingHardState.h
+++ b/tier1/threadweaver/src/Weaver/WorkingHardState.h
@@ -49,7 +49,7 @@ public:
     /** Resume job processing. */
     void resume() Q_DECL_OVERRIDE;
     /** Assign a job to an idle thread. */
-    JobPointer applyForWork(Thread *th, JobPointer previous);
+    JobPointer applyForWork(Thread *th, bool wasBusy) Q_DECL_OVERRIDE;
     /** Overload. */
     void activated() Q_DECL_OVERRIDE;
     /** reimpl */


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic