[prev in list] [next in list] [prev in thread] [next in thread]
List: kde-commits
Subject: [kdelibs/frameworks] tier1/threadweaver: Ensure neither queue nor thread keep a reference to a previ
From: Mirko Boehm (Endocode) <mirko () endocode ! com>
Date: 2013-10-31 18:50:28
Message-ID: E1VbxK8-0000SU-Ly () scm ! kde ! org
[Download RAW message or body]
Git commit 3d5e9cdf1f349e04a3d5384700b7abb9aeb041f3 by Mirko Boehm (Endocode).
Committed on 31/10/2013 at 18:46.
Pushed by mirko into branch 'frameworks'.
Ensure neither queue nor thread keep a reference to a previous job.
See comments on JobTests::JobsAreDestroyedAfterFinish:
Verify that neither the queue nor the thread keep a reference to the job
after completing it.
This is necessary because user-allocated objects like queue policies may
be registered with the jobs. If the jobs stick around until the thread
or queue are deleted, the user-allocated objects may have gone out of
scope or been deleted already, causing potential errors. From
ThreadWeaver's point of view, a job seizes to exist once the processing
thread asks for the next job.
M +15 -4 tier1/threadweaver/autotests/JobTests.cpp
M +3 -3 tier1/threadweaver/src/Weaver/DestructedState.cpp
M +1 -1 tier1/threadweaver/src/Weaver/DestructedState.h
M +3 -3 tier1/threadweaver/src/Weaver/InConstructionState.cpp
M +1 -1 tier1/threadweaver/src/Weaver/InConstructionState.h
M +2 -2 tier1/threadweaver/src/Weaver/QueueInterface.h
M +3 -3 tier1/threadweaver/src/Weaver/ShuttingDownState.cpp
M +1 -1 tier1/threadweaver/src/Weaver/ShuttingDownState.h
M +3 -3 tier1/threadweaver/src/Weaver/SuspendedState.cpp
M +1 -1 tier1/threadweaver/src/Weaver/SuspendedState.h
M +3 -3 tier1/threadweaver/src/Weaver/SuspendingState.cpp
M +1 -1 tier1/threadweaver/src/Weaver/SuspendingState.h
M +15 -17 tier1/threadweaver/src/Weaver/Thread.cpp
M +7 -7 tier1/threadweaver/src/Weaver/WeaverImpl.cpp
M +2 -2 tier1/threadweaver/src/Weaver/WeaverImpl.h
M +5 -5 tier1/threadweaver/src/Weaver/WorkingHardState.cpp
M +1 -1 tier1/threadweaver/src/Weaver/WorkingHardState.h
http://commits.kde.org/kdelibs/3d5e9cdf1f349e04a3d5384700b7abb9aeb041f3
diff --git a/tier1/threadweaver/autotests/JobTests.cpp \
b/tier1/threadweaver/autotests/JobTests.cpp index 6dd1667..9b23d9b 100644
--- a/tier1/threadweaver/autotests/JobTests.cpp
+++ b/tier1/threadweaver/autotests/JobTests.cpp
@@ -867,22 +867,33 @@ struct InstanceCountedJob : public Job {
}
~InstanceCountedJob() {
- counter.fetchAndAddRelease(0);
+ counter.fetchAndAddRelease(-1);
}
};
QAtomicInt InstanceCountedJob::counter;
+/** @brief Verify that neither the queue nor the thread keep a reference to the job \
after completing it. + *
+ * This is necessary because user-allocated objects like queue policies may be \
registered with the jobs. If the jobs stick around + * until the thread or queue are \
deleted, the user-allocatd objects may have gone out of scope or been deleted \
already, causing + * potential errors. From ThreadWeaver's point of view, a job \
seizes to exist once the processing thread asks for the next job. */ void \
JobTests::JobsAreDestroyedAfterFinish() {
using namespace ThreadWeaver;
-
- WaitForIdleAndFinished w(Weaver::instance());
+ WaitForIdleAndFinished w(Weaver::instance()); Q_UNUSED(w);
Weaver::instance()->suspend();
- Weaver::instance()->enqueue(JobPointer(new InstanceCountedJob));
+ JobPointer job(new InstanceCountedJob);
+ Weaver::instance()->enqueue(job);
QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 1);
Weaver::instance()->resume();
+ QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 1);
Weaver::instance()->finish();
+ QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 1);
+ QCoreApplication::processEvents();
+ QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 1);
+ job.clear();
+ // if this succeeds, job is the only shared pointer pointing to the created \
InstanceCountedJob object: QCOMPARE(InstanceCountedJob::counter.loadAcquire(), 0);
}
diff --git a/tier1/threadweaver/src/Weaver/DestructedState.cpp \
b/tier1/threadweaver/src/Weaver/DestructedState.cpp index 5b6fc87..4b81fa7 100644
--- a/tier1/threadweaver/src/Weaver/DestructedState.cpp
+++ b/tier1/threadweaver/src/Weaver/DestructedState.cpp
@@ -111,10 +111,10 @@ void DestructedState::resume()
{
}
-JobPointer DestructedState::applyForWork(Thread*, JobPointer previous)
+JobPointer DestructedState::applyForWork(Thread*, bool wasBusy)
{
- Q_UNUSED(previous) // except in Q_ASSERT
- Q_ASSERT(previous==0);
+ Q_UNUSED(wasBusy) // except in Q_ASSERT
+ Q_ASSERT(wasBusy==false);
return JobPointer();
}
diff --git a/tier1/threadweaver/src/Weaver/DestructedState.h \
b/tier1/threadweaver/src/Weaver/DestructedState.h index b222c1f..906f873 100644
--- a/tier1/threadweaver/src/Weaver/DestructedState.h
+++ b/tier1/threadweaver/src/Weaver/DestructedState.h
@@ -63,7 +63,7 @@ public:
void requestAbort() Q_DECL_OVERRIDE;
void suspend() Q_DECL_OVERRIDE;
void resume() Q_DECL_OVERRIDE;
- JobPointer applyForWork(Thread *th, JobPointer previous) Q_DECL_OVERRIDE;
+ JobPointer applyForWork(Thread *th, bool wasBusy) Q_DECL_OVERRIDE;
void waitForAvailableJob ( Thread *th ) Q_DECL_OVERRIDE;
StateId stateId() const Q_DECL_OVERRIDE;
};
diff --git a/tier1/threadweaver/src/Weaver/InConstructionState.cpp \
b/tier1/threadweaver/src/Weaver/InConstructionState.cpp index 863612f..6605fd2 100644
--- a/tier1/threadweaver/src/Weaver/InConstructionState.cpp
+++ b/tier1/threadweaver/src/Weaver/InConstructionState.cpp
@@ -48,9 +48,9 @@ void InConstructionState::resume()
// this request is not handled in InConstruction state
}
-JobPointer InConstructionState::applyForWork(Thread *th, JobPointer previous)
+JobPointer InConstructionState::applyForWork(Thread *th, bool wasBusy)
{
- Q_ASSERT(previous==0);
+ Q_ASSERT(wasBusy==false);
// As long as we are in the construction state, no jobs will be given
// to the worker threads. The threads will be suspended. They will
// return from the blocked state when jobs are queued. By then, we
@@ -59,7 +59,7 @@ JobPointer InConstructionState::applyForWork(Thread *th, JobPointer \
previous) while (weaver()->state()->stateId() == InConstruction) {
weaver()->waitForAvailableJob(th);
}
- return weaver()->applyForWork(th, previous);
+ return weaver()->applyForWork(th, wasBusy);
}
StateId InConstructionState::stateId() const
diff --git a/tier1/threadweaver/src/Weaver/InConstructionState.h \
b/tier1/threadweaver/src/Weaver/InConstructionState.h index b96c9a9..5f5aaad 100644
--- a/tier1/threadweaver/src/Weaver/InConstructionState.h
+++ b/tier1/threadweaver/src/Weaver/InConstructionState.h
@@ -52,7 +52,7 @@ public:
/** Resume job processing. */
void resume() Q_DECL_OVERRIDE;
/** Assign a job to an idle thread. */
- JobPointer applyForWork(Thread *th, JobPointer previous) Q_DECL_OVERRIDE;
+ JobPointer applyForWork(Thread *th, bool wasBusy) Q_DECL_OVERRIDE;
/** reimpl */
StateId stateId() const Q_DECL_OVERRIDE;
};
diff --git a/tier1/threadweaver/src/Weaver/QueueInterface.h \
b/tier1/threadweaver/src/Weaver/QueueInterface.h index ecfa477..0ae98b3 100644
--- a/tier1/threadweaver/src/Weaver/QueueInterface.h
+++ b/tier1/threadweaver/src/Weaver/QueueInterface.h
@@ -14,9 +14,9 @@ public:
/** Assign a job to an idle thread.
* @param th the thread to give a new Job to
- * @param previous the job this thread finished before calling
+ * @param wasBusy true if a job was previously assigned to the calling thread
*/
- virtual JobPointer applyForWork(Thread *th, JobPointer previous) = 0;
+ virtual JobPointer applyForWork(Thread *th, bool wasBusy) = 0;
/** Wait (by suspending the calling thread) until a job becomes available. */
virtual void waitForAvailableJob(Thread *th) = 0;
diff --git a/tier1/threadweaver/src/Weaver/ShuttingDownState.cpp \
b/tier1/threadweaver/src/Weaver/ShuttingDownState.cpp index e690912..e1590b2 100644
--- a/tier1/threadweaver/src/Weaver/ShuttingDownState.cpp
+++ b/tier1/threadweaver/src/Weaver/ShuttingDownState.cpp
@@ -49,10 +49,10 @@ void ShuttingDownState::resume()
// ignored: when shutting down, we do not return from the suspended state
}
-JobPointer ShuttingDownState::applyForWork(Thread*, JobPointer previous)
+JobPointer ShuttingDownState::applyForWork(Thread*, bool wasBusy)
{
- Q_UNUSED(previous) // except in Q_ASSERT
- Q_ASSERT(previous==0);
+ Q_UNUSED(wasBusy) // except in Q_ASSERT
+ Q_ASSERT(wasBusy==false);
return JobPointer(); // tell threads to exit
}
diff --git a/tier1/threadweaver/src/Weaver/ShuttingDownState.h \
b/tier1/threadweaver/src/Weaver/ShuttingDownState.h index a6429c0..d929381 100644
--- a/tier1/threadweaver/src/Weaver/ShuttingDownState.h
+++ b/tier1/threadweaver/src/Weaver/ShuttingDownState.h
@@ -54,7 +54,7 @@ public:
/** Resume job processing. */
void resume() Q_DECL_OVERRIDE;
/** Assign a job to an idle thread. */
- JobPointer applyForWork(Thread *th, JobPointer previous) Q_DECL_OVERRIDE;
+ JobPointer applyForWork(Thread *th, bool wasBusy) Q_DECL_OVERRIDE;
/** Wait (by suspending the calling thread) until a job becomes available. */
void waitForAvailableJob(Thread *th) Q_DECL_OVERRIDE;
/** reimpl */
diff --git a/tier1/threadweaver/src/Weaver/SuspendedState.cpp \
b/tier1/threadweaver/src/Weaver/SuspendedState.cpp index 342d26e..106a593 100644
--- a/tier1/threadweaver/src/Weaver/SuspendedState.cpp
+++ b/tier1/threadweaver/src/Weaver/SuspendedState.cpp
@@ -48,11 +48,11 @@ void SuspendedState::resume()
weaver()->setState(WorkingHard);
}
-JobPointer SuspendedState::applyForWork(Thread *th, JobPointer previous)
+JobPointer SuspendedState::applyForWork(Thread *th, bool wasBusy)
{ // suspend all threads in case they wake up:
- Q_ASSERT(previous==0);
+ Q_ASSERT(wasBusy==0);
weaver()->waitForAvailableJob(th);
- return weaver()->applyForWork(th, previous);
+ return weaver()->applyForWork(th, wasBusy);
}
StateId SuspendedState::stateId() const
diff --git a/tier1/threadweaver/src/Weaver/SuspendedState.h \
b/tier1/threadweaver/src/Weaver/SuspendedState.h index 9456a74..6743d86 100644
--- a/tier1/threadweaver/src/Weaver/SuspendedState.h
+++ b/tier1/threadweaver/src/Weaver/SuspendedState.h
@@ -49,7 +49,7 @@ public:
/** Resume job processing. */
void resume() Q_DECL_OVERRIDE;
/** Assign a job to an idle thread. */
- JobPointer applyForWork(Thread *th, JobPointer previous) Q_DECL_OVERRIDE;
+ JobPointer applyForWork(Thread *th, bool wasBusy) Q_DECL_OVERRIDE;
/** reimpl */
StateId stateId() const Q_DECL_OVERRIDE;
};
diff --git a/tier1/threadweaver/src/Weaver/SuspendingState.cpp \
b/tier1/threadweaver/src/Weaver/SuspendingState.cpp index 2955852..8aec62c 100644
--- a/tier1/threadweaver/src/Weaver/SuspendingState.cpp
+++ b/tier1/threadweaver/src/Weaver/SuspendingState.cpp
@@ -54,11 +54,11 @@ void SuspendingState::activated()
weaver()->reschedule();
}
-JobPointer SuspendingState::applyForWork(Thread *th, JobPointer previous)
+JobPointer SuspendingState::applyForWork(Thread *th, bool wasBusy)
{
- weaver()->takeFirstAvailableJobOrSuspendOrWait(th, previous, true, true);
+ weaver()->takeFirstAvailableJobOrSuspendOrWait(th, wasBusy, true, true);
weaver()->waitForAvailableJob(th);
- return weaver()->applyForWork(th, JobPointer());
+ return weaver()->applyForWork(th, false);
}
StateId SuspendingState::stateId() const
diff --git a/tier1/threadweaver/src/Weaver/SuspendingState.h \
b/tier1/threadweaver/src/Weaver/SuspendingState.h index 5b205ce..7abe76d 100644
--- a/tier1/threadweaver/src/Weaver/SuspendingState.h
+++ b/tier1/threadweaver/src/Weaver/SuspendingState.h
@@ -50,7 +50,7 @@ public:
/** Resume job processing. */
void resume() Q_DECL_OVERRIDE;
/** Assign a job to an idle thread. */
- JobPointer applyForWork(Thread *th, JobPointer previous) Q_DECL_OVERRIDE;
+ JobPointer applyForWork(Thread *th, bool wasBusy) Q_DECL_OVERRIDE;
/** Overload. */
void activated() Q_DECL_OVERRIDE;
/** reimpl */
diff --git a/tier1/threadweaver/src/Weaver/Thread.cpp \
b/tier1/threadweaver/src/Weaver/Thread.cpp index 7de6f9c..8064583 100644
--- a/tier1/threadweaver/src/Weaver/Thread.cpp
+++ b/tier1/threadweaver/src/Weaver/Thread.cpp
@@ -95,26 +95,26 @@ void Thread::run()
emit started(this);
debug(3, "Thread::run [%u]: running.\n", id());
+ bool wasBusy = false;
while (true) {
debug(3, "Thread::run [%u]: trying to execute the next job.\n", id());
- JobPointer oldJob;
- {
- QMutexLocker l(&d->mutex); Q_UNUSED(l);
- oldJob = d->job; d->job.clear();
- }
- JobPointer newJob = d->parent->applyForWork(this, oldJob);
- if (newJob == 0) {
- break;
+ // the assignment is intentional: newJob needs to go out of scope at the end \
of the if statement + if (JobPointer newJob = d->parent->applyForWork(this, \
wasBusy)) { + QMutexLocker l(&d->mutex); Q_UNUSED(l);
+ d->job = newJob;
} else {
- {
- QMutexLocker l(&d->mutex); Q_UNUSED(l);
- d->job = newJob;
- }
- emit jobStarted(newJob, this);
- newJob->execute(newJob, this);
- emit jobDone(newJob);
+ break;
}
+
+ wasBusy = true;
+
+ emit jobStarted(d->job, this);
+ d->job->execute(d->job, this);
+ emit jobDone(d->job);
+
+ QMutexLocker l(&d->mutex); Q_UNUSED(l);
+ d->job.clear();
}
debug ( 3, "Thread::run [%u]: exiting.\n", id() );
}
@@ -124,8 +124,6 @@ void Thread::requestAbort ()
QMutexLocker l(&d->mutex); Q_UNUSED(l);
if (d->job) {
d->job->requestAbort();
- } else {
- qDebug ( "Thread::requestAbort: not running." );
}
}
diff --git a/tier1/threadweaver/src/Weaver/WeaverImpl.cpp \
b/tier1/threadweaver/src/Weaver/WeaverImpl.cpp index 56afea3..724d757 100644
--- a/tier1/threadweaver/src/Weaver/WeaverImpl.cpp
+++ b/tier1/threadweaver/src/Weaver/WeaverImpl.cpp
@@ -503,16 +503,16 @@ void WeaverImpl::threadEnteredRun(Thread *thread)
emit threadStarted(thread);
}
-JobPointer WeaverImpl::takeFirstAvailableJobOrSuspendOrWait(Thread *th, JobPointer \
previous, +JobPointer WeaverImpl::takeFirstAvailableJobOrSuspendOrWait(Thread *th, \
bool threadWasBusy,
bool suspendIfInactive, \
bool justReturning) {
QMutexLocker l (m_mutex); Q_UNUSED(l);
- Q_ASSERT(previous==0 || (previous != 0 && m_active > 0));
+ Q_ASSERT(threadWasBusy==false || (threadWasBusy == true && m_active > 0));
debug(3, "WeaverImpl::takeFirstAvailableJobOrWait: trying to assign new job to \
thread %i (%s state).\n", th->id(), qPrintable(state()->stateName()));
- debug(5, "WeaverImpl::takeFirstAvailableJobOrWait: %i active threads, previous: \
%p, suspend?: %s, assign new job?: %s.\n",
- activeThreadCount(), previous.data(), suspendIfInactive ? "yes" : "no", \
!justReturning ? "yes" : "no");
- if (previous) {
+ debug(5, "WeaverImpl::takeFirstAvailableJobOrWait: %i active threads, was busy: \
%s, suspend: %s, assign new job: %s.\n", + activeThreadCount(), \
threadWasBusy ? "yes" : "no", suspendIfInactive ? "yes" : "no", !justReturning ? \
"yes" : "no"); + if (threadWasBusy) {
// cleanup and send events:
decActiveThreadCount();
}
@@ -547,9 +547,9 @@ JobPointer \
WeaverImpl::takeFirstAvailableJobOrSuspendOrWait(Thread *th, JobPoint return \
JobPointer(); }
-JobPointer WeaverImpl::applyForWork(Thread *th, JobPointer previous)
+JobPointer WeaverImpl::applyForWork(Thread *th, bool wasBusy)
{
- return state()->applyForWork(th, previous);
+ return state()->applyForWork(th, wasBusy);
}
void WeaverImpl::waitForAvailableJob(Thread* th)
diff --git a/tier1/threadweaver/src/Weaver/WeaverImpl.h \
b/tier1/threadweaver/src/Weaver/WeaverImpl.h index 7e5eab2..a86b91b 100644
--- a/tier1/threadweaver/src/Weaver/WeaverImpl.h
+++ b/tier1/threadweaver/src/Weaver/WeaverImpl.h
@@ -99,7 +99,7 @@ public:
met.
In *previous*, threads give the job they have completed. If this is
the first job, previous is zero. */
- virtual JobPointer applyForWork (Thread *thread, JobPointer previous) \
Q_DECL_OVERRIDE; + virtual JobPointer applyForWork (Thread *thread, bool wasBusy) \
Q_DECL_OVERRIDE; /** Wait for a job to become available. */
void waitForAvailableJob(Thread *th) Q_DECL_OVERRIDE;
/** Blocks the calling thread until some actor calls assignJobs. */
@@ -124,7 +124,7 @@ public:
* available. If only jobs that depened on other, unfinished jobs are in the \
queue, this method blocks on m_jobAvailable.
* Go to suspended state if the active thread count is now zero and \
suspendIfAllThreadsInactive is true.
* If justReturning is true, do not assign a new job, just process the completed \
previous one. */
- JobPointer takeFirstAvailableJobOrSuspendOrWait(Thread* th, JobPointer previous,
+ JobPointer takeFirstAvailableJobOrSuspendOrWait(Thread* th, bool threadWasBusy,
bool \
suspendIfAllThreadsInactive, bool justReturning); void requestAbort() \
Q_DECL_OVERRIDE;
diff --git a/tier1/threadweaver/src/Weaver/WorkingHardState.cpp \
b/tier1/threadweaver/src/Weaver/WorkingHardState.cpp index e5acf6f..347238d 100644
--- a/tier1/threadweaver/src/Weaver/WorkingHardState.cpp
+++ b/tier1/threadweaver/src/Weaver/WorkingHardState.cpp
@@ -58,18 +58,18 @@ void WorkingHardState::resume()
{
}
-JobPointer WorkingHardState::applyForWork(Thread *th, JobPointer previous)
+JobPointer WorkingHardState::applyForWork(Thread *th, bool wasBusy)
{ // beware: this code is executed in the applying thread!
debug(2, "WorkingHardState::applyForWork: thread %i applies for work in %s \
state.\n", th->id(),
- qPrintable ( weaver()->state()->stateName() ) );
- JobPointer next = weaver()->takeFirstAvailableJobOrSuspendOrWait(th, previous, \
false, false); + qPrintable(weaver()->state()->stateName()));
+ JobPointer next = weaver()->takeFirstAvailableJobOrSuspendOrWait(th, wasBusy, \
false, false); if (next) {
return next;
} else {
// this is no infinite recursion: the state may have changed meanwhile, or \
jobs may have become available:
debug(2, "WorkingHardState::applyForWork: repeating for thread %i in %s \
state.\n", th->id(),
- qPrintable ( weaver()->state()->stateName() ) );
- return weaver()->applyForWork(th, JobPointer());
+ qPrintable(weaver()->state()->stateName()));
+ return weaver()->applyForWork(th, false);
}
}
diff --git a/tier1/threadweaver/src/Weaver/WorkingHardState.h \
b/tier1/threadweaver/src/Weaver/WorkingHardState.h index a222832..74bb434 100644
--- a/tier1/threadweaver/src/Weaver/WorkingHardState.h
+++ b/tier1/threadweaver/src/Weaver/WorkingHardState.h
@@ -49,7 +49,7 @@ public:
/** Resume job processing. */
void resume() Q_DECL_OVERRIDE;
/** Assign a job to an idle thread. */
- JobPointer applyForWork(Thread *th, JobPointer previous);
+ JobPointer applyForWork(Thread *th, bool wasBusy) Q_DECL_OVERRIDE;
/** Overload. */
void activated() Q_DECL_OVERRIDE;
/** reimpl */
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic