[prev in list] [next in list] [prev in thread] [next in thread] 

List:       sptk-commits
Subject:    r1631 - in trunk: examples/threads sptk4 src/utils
From:       alexey () mail ! total-knowledge ! com
Date:       2011-09-21 4:08:46
Message-ID: courier.000000004E79634E.00007236 () mail ! total-knowledge ! com
[Download RAW message or body]

Author: alexey
Date: 2011-09-20 21:08:46 -0700 (Tue, 20 Sep 2011)
New Revision: 1631

Modified:
   trunk/examples/threads/thread_pool_test.cpp
   trunk/sptk4/CThreadPool.h
   trunk/src/utils/CThreadPool.cpp
Log:
CThreadPool: added optional void* data to queue task method - to pass to \
taskFunction() as a parameter

Modified: trunk/examples/threads/thread_pool_test.cpp
===================================================================
--- trunk/examples/threads/thread_pool_test.cpp	2011-09-20 23:21:32 UTC (rev 1630)
+++ trunk/examples/threads/thread_pool_test.cpp	2011-09-21 04:08:46 UTC (rev 1631)
@@ -38,7 +38,7 @@
     CProxyLog           m_log;
 protected:
     // The thread function.
-    virtual void taskFunction();
+    virtual void taskFunction(void *data);
 
 public:
 
@@ -74,7 +74,7 @@
 }
 
 // The thread function. Prints a message once a second till terminated
-void CMyTask::taskFunction()
+void CMyTask::taskFunction(void*)
 {
     m_log << m_id << " is started\n";
 

Modified: trunk/sptk4/CThreadPool.h
===================================================================
--- trunk/sptk4/CThreadPool.h	2011-09-20 23:21:32 UTC (rev 1630)
+++ trunk/sptk4/CThreadPool.h	2011-09-21 04:08:46 UTC (rev 1631)
@@ -51,7 +51,8 @@
 ///
 /// Objects of this class maintain pool of threads
 /// And a queue of tasks, implemented as CThreadPool::Task derivatives
-class CThreadPool {
+class CThreadPool
+{
 private:
     /// @brief Pool thread class
     ///
@@ -106,18 +107,19 @@
         friend class CThreadPool::Thread;
 
     protected:
-        bool        m_autoDestruct; ///< If true then task is deleted upon \
                completion
-        CWaiter     m_lock;         ///< Lock to protect task operations
-        Thread*     m_parentThread; ///< Parent thread executing the task, or NULL \
                when task is idle
-        bool        m_terminated;   ///< Task termination flag (should be reset when \
                task is executed)
-        CProxyLog*  m_taskLog;      ///< Task log (if defined in thread pool) (only \
during task execution) +        bool                m_autoDestruct; ///< If true then \
task is deleted upon completion +        CCriticalSection    m_taskLock;     ///< \
Lock to protect task operations +        Thread*             m_parentThread; ///< \
Parent thread executing the task, or NULL when task is idle +        bool             \
m_terminated;   ///< Task termination flag (should be reset when task is executed) +  \
CProxyLog*          m_taskLog;      ///< Task log (if defined in thread pool) (only \
during task execution)  
         /// @brief Internal method executing the task
         ///
         /// Locks the task, assigns parent thread to it, and executes task function
         /// @param thread Thread*, Thread that executes this task
+        /// @param data void*, Optional data passed in CThreadPool::queue() method
         /// @param log CProxyLog*, Optional log object
-        virtual void execute(Thread* thread, CProxyLog* log);
+        virtual void execute(Thread* thread, CProxyLog* log, void* data);
 
     public:
         /// @brief Destructor
@@ -135,7 +137,8 @@
         }
 
         /// @brief Runs the task
-        virtual void taskFunction()=0;
+        /// @param data void*, Optional data passed through
+        virtual void taskFunction(void* data)=0;
 
         /// @brief Returns true when task should be deleted upon completion
         bool autoDestruct() const
@@ -166,10 +169,7 @@
         }
 
         /// @brief Waits till this task is completed
-        ///
-        /// Negative timeout means 'forever'.
-        /// @param timeoutMS int32_t, Timeout interval in milliseconds
-        bool waitWhileRunning(int32_t timeoutMS=-1);
+        bool waitWhileRunning();
     };
 
     /// @brief Defines logging within the pool
@@ -182,11 +182,48 @@
 
 private:
 
-    typedef CSafeList<Task*>  tasklist_t;       ///< Task list type definition
+    /// @brief Defines task execution with optional data passed to task
+    struct TaskRunInfo
+    {
+        Task*   m_task;     ///< Task to execute
+        void*   m_data;     ///< Data to pass to the task
+        TaskRunInfo(Task* task, void* data=NULL) :
+            m_task(task), m_data(data)
+        {}
+    };
 
+    /// @brief Task list type definition
+    class TaskList : public CSafeList<TaskRunInfo>
+    {
+    public:
+        /// @brief Constructor
+        ///
+        /// If maxListItems is omitted or 0, the list is limited only with available \
memory. +        /// Otherwise, when list is full, push() will block until a task \
leaves the list. +        /// @param maxListItems uint32_t, Maximum number of tasks \
allowed in the list +        TaskList(uint32_t maxListItems=0) : \
CSafeList(maxListItems) {} +
+        /// @brief Removes a task from the list
+        ///
+        /// If the task exists in the list multiple times,
+        /// all copies of the task are removed.
+        /// @param task Task*, Task to remove
+        void eraseTask(Task* task)
+        {
+            CODE_GUARD(m_lock);
+            for (iterator itor = m_list->begin(); itor != m_list->end();) {
+                TaskRunInfo& taskRunInfo = *itor;
+                if (taskRunInfo.m_task == task)
+                    itor = m_list->erase(itor);
+                else
+                    itor++;
+            }
+        }
+    };
+
     CWaiter         m_lock;                     ///< Thread pool lock
     CThreadManager  m_threads;                  ///< Thread pool
-    tasklist_t      m_queue;                    ///< Task queue
+    TaskList        m_queue;                    ///< Task queue
     uint32_t        m_numThreads;               ///< Maximum number of threads
     uint32_t        m_threadStackSize;          ///< Optional thread stack size, or \
                0 for the system default
     bool            m_running;                  ///< Thread pool running flag
@@ -252,7 +289,8 @@
     /// TODO: Add an option to return error instead.
     /// @param task Task*, Task to add
     /// @param autoDestruct bool, If true then task is deleted upon completion
-    virtual void queue(Task* task, bool autoDestruct=true);
+    /// @param data void*, Optional data to pass to taskFunction()
+    virtual void queue(Task* task, bool autoDestruct=true, void* data=NULL);
 
     /// @brief Removes a task from the queue.
     ///

Modified: trunk/src/utils/CThreadPool.cpp
===================================================================
--- trunk/src/utils/CThreadPool.cpp	2011-09-20 23:21:32 UTC (rev 1630)
+++ trunk/src/utils/CThreadPool.cpp	2011-09-21 04:08:46 UTC (rev 1631)
@@ -42,24 +42,20 @@
 using namespace std;
 using namespace sptk;
 
-void CThreadPool::Task::execute(Thread* thread, CProxyLog* log)
+void CThreadPool::Task::execute(Thread* thread, CProxyLog* log, void* data)
 {
-    GUARD(m_lock);
+    CODE_GUARD(m_taskLock);
     m_parentThread = thread;
     m_taskLog = log;
-    taskFunction();
+    taskFunction(data);
     m_taskLog = NULL;
     m_parentThread = NULL;
 }
 
-bool CThreadPool::Task::waitWhileRunning(int32_t timeoutMS)
+bool CThreadPool::Task::waitWhileRunning()
 {
     try {
-        if (timeoutMS < 0)
-            m_lock.lock();
-        else
-            m_lock.lock(timeoutMS);
-        m_lock.unlock();
+        CODE_GUARD(m_taskLock);
         return true;
     } catch (exception&) {
         return false;
@@ -139,15 +135,16 @@
         *m_poolLog << CLP_DEBUG << "Thread pool stopped all threads.\n";
 }
 
-void CThreadPool::queue(CThreadPool::Task* t, bool autoDestruct)
+void CThreadPool::queue(CThreadPool::Task* t, bool autoDestruct, void* data)
 {
+    TaskRunInfo runInfo(t, data);
     t->m_autoDestruct = autoDestruct;
     m_queue.push_back(t);
 }
 
 void CThreadPool::erase(CThreadPool::Task* t)
 {
-    m_queue.erase(t);
+    m_queue.eraseTask(t);
     t->terminate();
     t->waitWhileRunning();
 }
@@ -184,11 +181,12 @@
 
 void CThreadPool::Thread::threadFunction()
 {
-    CThreadPool::Task* task(NULL);
+    TaskRunInfo taskRunInfo(NULL);
     while (!terminated()) {
         try {
-            if (m_pool->m_queue.pop_front(task)) {
-                task->execute(this, m_log);
+            if (m_pool->m_queue.pop_front(taskRunInfo)) {
+                Task* task = taskRunInfo.m_task;
+                task->execute(this, m_log, taskRunInfo.m_data);
                 if (task->autoDestruct())
                     delete task;
             }


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic