ThreadPool.cpp

Go to the documentation of this file.
00001 #include "ThreadPool.h"
00002 #include <iostream>
00003 
00004 using std::cout ;
00005 using std::endl ;
00006 using std::cerr;
00007 
00008 WorkerMainLoop::WorkerMainLoop(WorkerState *st,
00009                                boost::condition &c1, boost::mutex &m1,
00010                                boost::condition &c2, boost::mutex &m2)
00011         : cond1(c1), mtx1(m1), cond2(c2), mtx2(m2)
00012 {
00013     state = st;
00014     state->toBeFinished = false;
00015 }
00016 
00017 void WorkerMainLoop::operator() ()
00018 {
00019     boost::mutex::scoped_lock l1(mtx1);
00020     boost::mutex::scoped_lock l2(mtx2);
00021     l1.unlock();
00022     cond1.notify_one();
00023     while (!state->toBeFinished) {
00024         cond2.wait(l2); // <----- this is where the thread waits for a job to be started
00025         l1.lock();
00026         l1.unlock();
00027         cond1.notify_one();
00028         if (state->job && !state->toBeFinished) {
00029             (*state->job).start();
00030         }
00031     }
00032 }
00033 
00034 WorkerMainLoop::~WorkerMainLoop()
00035 {}
00036 
00037 WorkerState::WorkerState()
00038 {
00039     job = 0;
00040 }
00041 
00042 void WorkerState::finish()
00043 {
00044     toBeFinished = true ;
00045 }
00046 
00047 void WorkerState::assignJob(ThreadPoolJob &j)
00048 {
00049     job = &j;
00050 }
00051 
00052 boost::function0<void> _f_;
00053 
00054 ThreadPool::ThreadPool(int maxNumThreads) : _maxNumThreads(maxNumThreads)
00055 {
00056     threads.resize(_maxNumThreads);
00057     workerstates.resize(_maxNumThreads);
00058     cond1.resize(_maxNumThreads);
00059     cond2.resize(_maxNumThreads);
00060     mtx1.resize(_maxNumThreads);
00061     mtx2.resize(_maxNumThreads);
00062 
00063     for (int i = 0 ; i < _maxNumThreads ; i++) {
00064         cond1[i] = new condition ;
00065         cond2[i] = new condition ;
00066         mtx1[i] = new mutex;
00067         mtx2[i] = new mutex;
00068         workerstates[i] = new WorkerState();
00069         WorkerMainLoop worker(workerstates[i], *cond1[i], *mtx1[i], *cond2[i], *mtx2[i]);
00070         boost::function0<void> f;
00071         f = worker ;
00072         mutex::scoped_lock l(*mtx1[i]);
00073         try {
00074              threads[i] = new thread( f );
00075         } catch (boost::thread_resource_error &) {
00076             cout << "ERROR: Cannot start another thread" << endl;
00077         }
00078         cond1[i]->wait(l);
00079         mutex::scoped_lock l2(*mtx2[i]);
00080     }
00081 }
00082 
00083 int ThreadPool::dispatch(int id, ThreadPoolJob &f)
00084 {
00085     workerstates[id]->assignJob(f);
00086     boost::mutex::scoped_lock l1(*mtx1[id]);
00087     cond2[id]->notify_one();
00088     cond1[id]->wait(l1);
00089     return 0;
00090 }
00091 
00092 void ThreadPool::waitAll()
00093 {
00094     for (int i = 0 ; i < _maxNumThreads ; i++ )
00095         boost::mutex::scoped_lock l2(*mtx2[i]);
00096 }
00097 
00098 
00099 ThreadPool::~ThreadPool()
00100 {
00101     for (int i = 0 ; i < _maxNumThreads ; i++ ) {
00102         workerstates[i]->finish();
00103         cond2[i]->notify_one();
00104         threads[i]->join();
00105         delete workerstates[i];
00106         delete threads[i];
00107         delete cond1[i];
00108         delete cond2[i];
00109         delete mtx1[i];
00110         delete mtx2[i];
00111     }
00112 }

Generated on Wed Jul 9 16:34:39 2008 for PCSIM by  doxygen 1.5.5