[prev in list] [next in list] [prev in thread] [next in thread]
List: sptk-commits
Subject: r1631 - in trunk: examples/threads sptk4 src/utils
From: alexey () mail ! total-knowledge ! com
Date: 2011-09-21 4:08:46
Message-ID: courier.000000004E79634E.00007236 () mail ! total-knowledge ! com
[Download RAW message or body]
Author: alexey
Date: 2011-09-20 21:08:46 -0700 (Tue, 20 Sep 2011)
New Revision: 1631
Modified:
trunk/examples/threads/thread_pool_test.cpp
trunk/sptk4/CThreadPool.h
trunk/src/utils/CThreadPool.cpp
Log:
CThreadPool: added optional void* data to queue task method - to pass to \
taskFunction() as a parameter
Modified: trunk/examples/threads/thread_pool_test.cpp
===================================================================
--- trunk/examples/threads/thread_pool_test.cpp 2011-09-20 23:21:32 UTC (rev 1630)
+++ trunk/examples/threads/thread_pool_test.cpp 2011-09-21 04:08:46 UTC (rev 1631)
@@ -38,7 +38,7 @@
CProxyLog m_log;
protected:
// The thread function.
- virtual void taskFunction();
+ virtual void taskFunction(void *data);
public:
@@ -74,7 +74,7 @@
}
// The thread function. Prints a message once a second till terminated
-void CMyTask::taskFunction()
+void CMyTask::taskFunction(void*)
{
m_log << m_id << " is started\n";
Modified: trunk/sptk4/CThreadPool.h
===================================================================
--- trunk/sptk4/CThreadPool.h 2011-09-20 23:21:32 UTC (rev 1630)
+++ trunk/sptk4/CThreadPool.h 2011-09-21 04:08:46 UTC (rev 1631)
@@ -51,7 +51,8 @@
///
/// Objects of this class maintain pool of threads
/// And a queue of tasks, implemented as CThreadPool::Task derivatives
-class CThreadPool {
+class CThreadPool
+{
private:
/// @brief Pool thread class
///
@@ -106,18 +107,19 @@
friend class CThreadPool::Thread;
protected:
- bool m_autoDestruct; ///< If true then task is deleted upon \
completion
- CWaiter m_lock; ///< Lock to protect task operations
- Thread* m_parentThread; ///< Parent thread executing the task, or NULL \
when task is idle
- bool m_terminated; ///< Task termination flag (should be reset when \
task is executed)
- CProxyLog* m_taskLog; ///< Task log (if defined in thread pool) (only \
during task execution) + bool m_autoDestruct; ///< If true then \
task is deleted upon completion + CCriticalSection m_taskLock; ///< \
Lock to protect task operations + Thread* m_parentThread; ///< \
Parent thread executing the task, or NULL when task is idle + bool \
m_terminated; ///< Task termination flag (should be reset when task is executed) + \
CProxyLog* m_taskLog; ///< Task log (if defined in thread pool) (only \
during task execution)
/// @brief Internal method executing the task
///
/// Locks the task, assigns parent thread to it, and executes task function
/// @param thread Thread*, Thread that executes this task
+ /// @param data void*, Optional data passed in CThreadPool::queue() method
/// @param log CProxyLog*, Optional log object
- virtual void execute(Thread* thread, CProxyLog* log);
+ virtual void execute(Thread* thread, CProxyLog* log, void* data);
public:
/// @brief Destructor
@@ -135,7 +137,8 @@
}
/// @brief Runs the task
- virtual void taskFunction()=0;
+ /// @param data void*, Optional data passed through
+ virtual void taskFunction(void* data)=0;
/// @brief Returns true when task should be deleted upon completion
bool autoDestruct() const
@@ -166,10 +169,7 @@
}
/// @brief Waits till this task is completed
- ///
- /// Negative timeout means 'forever'.
- /// @param timeoutMS int32_t, Timeout interval in milliseconds
- bool waitWhileRunning(int32_t timeoutMS=-1);
+ bool waitWhileRunning();
};
/// @brief Defines logging within the pool
@@ -182,11 +182,48 @@
private:
- typedef CSafeList<Task*> tasklist_t; ///< Task list type definition
+ /// @brief Defines task execution with optional data passed to task
+ struct TaskRunInfo
+ {
+ Task* m_task; ///< Task to execute
+ void* m_data; ///< Data to pass to the task
+ TaskRunInfo(Task* task, void* data=NULL) :
+ m_task(task), m_data(data)
+ {}
+ };
+ /// @brief Task list type definition
+ class TaskList : public CSafeList<TaskRunInfo>
+ {
+ public:
+ /// @brief Constructor
+ ///
+ /// If maxListItems is omitted or 0, the list is limited only with available \
memory. + /// Otherwise, when list is full, push() will block until a task \
leaves the list. + /// @param maxListItems uint32_t, Maximum number of tasks \
allowed in the list + TaskList(uint32_t maxListItems=0) : \
CSafeList(maxListItems) {} +
+ /// @brief Removes a task from the list
+ ///
+ /// If the task exists in the list multiple times,
+ /// all copies of the task are removed.
+ /// @param task Task*, Task to remove
+ void eraseTask(Task* task)
+ {
+ CODE_GUARD(m_lock);
+ for (iterator itor = m_list->begin(); itor != m_list->end();) {
+ TaskRunInfo& taskRunInfo = *itor;
+ if (taskRunInfo.m_task == task)
+ itor = m_list->erase(itor);
+ else
+ itor++;
+ }
+ }
+ };
+
CWaiter m_lock; ///< Thread pool lock
CThreadManager m_threads; ///< Thread pool
- tasklist_t m_queue; ///< Task queue
+ TaskList m_queue; ///< Task queue
uint32_t m_numThreads; ///< Maximum number of threads
uint32_t m_threadStackSize; ///< Optional thread stack size, or \
0 for the system default
bool m_running; ///< Thread pool running flag
@@ -252,7 +289,8 @@
/// TODO: Add an option to return error instead.
/// @param task Task*, Task to add
/// @param autoDestruct bool, If true then task is deleted upon completion
- virtual void queue(Task* task, bool autoDestruct=true);
+ /// @param data void*, Optional data to pass to taskFunction()
+ virtual void queue(Task* task, bool autoDestruct=true, void* data=NULL);
/// @brief Removes a task from the queue.
///
Modified: trunk/src/utils/CThreadPool.cpp
===================================================================
--- trunk/src/utils/CThreadPool.cpp 2011-09-20 23:21:32 UTC (rev 1630)
+++ trunk/src/utils/CThreadPool.cpp 2011-09-21 04:08:46 UTC (rev 1631)
@@ -42,24 +42,20 @@
using namespace std;
using namespace sptk;
-void CThreadPool::Task::execute(Thread* thread, CProxyLog* log)
+void CThreadPool::Task::execute(Thread* thread, CProxyLog* log, void* data)
{
- GUARD(m_lock);
+ CODE_GUARD(m_taskLock);
m_parentThread = thread;
m_taskLog = log;
- taskFunction();
+ taskFunction(data);
m_taskLog = NULL;
m_parentThread = NULL;
}
-bool CThreadPool::Task::waitWhileRunning(int32_t timeoutMS)
+bool CThreadPool::Task::waitWhileRunning()
{
try {
- if (timeoutMS < 0)
- m_lock.lock();
- else
- m_lock.lock(timeoutMS);
- m_lock.unlock();
+ CODE_GUARD(m_taskLock);
return true;
} catch (exception&) {
return false;
@@ -139,15 +135,16 @@
*m_poolLog << CLP_DEBUG << "Thread pool stopped all threads.\n";
}
-void CThreadPool::queue(CThreadPool::Task* t, bool autoDestruct)
+void CThreadPool::queue(CThreadPool::Task* t, bool autoDestruct, void* data)
{
+ TaskRunInfo runInfo(t, data);
t->m_autoDestruct = autoDestruct;
m_queue.push_back(t);
}
void CThreadPool::erase(CThreadPool::Task* t)
{
- m_queue.erase(t);
+ m_queue.eraseTask(t);
t->terminate();
t->waitWhileRunning();
}
@@ -184,11 +181,12 @@
void CThreadPool::Thread::threadFunction()
{
- CThreadPool::Task* task(NULL);
+ TaskRunInfo taskRunInfo(NULL);
while (!terminated()) {
try {
- if (m_pool->m_queue.pop_front(task)) {
- task->execute(this, m_log);
+ if (m_pool->m_queue.pop_front(taskRunInfo)) {
+ Task* task = taskRunInfo.m_task;
+ task->execute(this, m_log, taskRunInfo.m_data);
if (task->autoDestruct())
delete task;
}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic