00001 #include <tjutils/tjthread.h>
00002 #include <tjutils/tjlog.h>
00003
00004
00005 template<typename In, typename Out>
00006 void ThreadedLoop<In,Out>::WorkThread::run() {
00007 Log<ThreadComponent> odinlog("WorkThread","run");
00008 while(1) {
00009
00010 process.wait();
00011 process.reset();
00012 ODINLOG(odinlog,normalDebug) << "catched process signal, cont=" << tloop->cont << STD_endl;
00013 if(!tloop->cont) break;
00014
00015 ODINLOG(odinlog,normalDebug) << "processing thread " << begin << "/" << end << STD_endl;
00016
00017 status=tloop->kernel(*tloop->in_cache, *out_cache, begin, end);
00018
00019 ODINLOG(odinlog,normalDebug) << "signaling finished=" << status << STD_endl;
00020 finished.signal();
00021
00022 if(!status) break;
00023 }
00024 }
00025
00027
00028
00029 template<typename In, typename Out>
00030 bool ThreadedLoop<In,Out>::init(unsigned int numof_threads, unsigned int loopsize) {
00031 Log<ThreadComponent> odinlog("ThreadedLoop","init");
00032 mainbegin=0;
00033 mainend=loopsize;
00034 #ifndef NO_THREADS
00035 destroy();
00036 ODINLOG(odinlog,normalDebug) << "numof_threads=" << numof_threads << STD_endl;
00037 if(numof_threads>1) {
00038 threads.resize(numof_threads-1);
00039 unsigned int onesize=loopsize/numof_threads;
00040 unsigned int rest=loopsize%numof_threads;
00041 unsigned int count=0;
00042 for(unsigned int i=0; i<(numof_threads-1); i++) {
00043 threads[i]=new WorkThread(this);
00044 threads[i]->begin=count;
00045 count+=onesize;
00046 if(i<rest) count++;
00047 threads[i]->end=count;
00048 threads[i]->start();
00049 }
00050 mainbegin=count;
00051 count+=onesize;
00052 if((numof_threads-1)<rest) count++;
00053 mainend=count;
00054 }
00055 #endif
00056 return true;
00057 }
00058
00059 template<typename In, typename Out>
00060 void ThreadedLoop<In,Out>::destroy() {
00061 Log<ThreadComponent> odinlog("ThreadedLoop","destroy");
00062 #ifndef NO_THREADS
00063 cont=false;
00064 for(unsigned int i=0; i<threads.size(); i++) {
00065 threads[i]->process.signal();
00066 threads[i]->wait();
00067 delete threads[i];
00068 }
00069 threads.resize(0);
00070 #endif
00071 }
00072
00073 template<typename In, typename Out>
00074 bool ThreadedLoop<In,Out>::execute(const In& in, STD_vector<Out>& outvec) {
00075 Log<ThreadComponent> odinlog("ThreadedLoop","execute");
00076 #ifdef NO_THREADS
00077 outvec.resize(1);
00078 return kernel(in, outvec[0], mainbegin, mainend);
00079 #else
00080
00081 unsigned int nthreads=threads.size();
00082
00083 outvec.resize(nthreads+1);
00084
00085 if(nthreads) {
00086 in_cache=∈
00087 cont=true;
00088
00089 for(unsigned int i=0; i<nthreads; i++) {
00090 threads[i]->out_cache=&(outvec[i]);
00091 threads[i]->status=true;
00092 threads[i]->process.signal();
00093 }
00094 }
00095
00096 bool result=kernel(in, outvec[nthreads], mainbegin, mainend);
00097
00098 if(nthreads) {
00099 for(unsigned int i=0; i<nthreads; i++) {
00100 threads[i]->finished.wait();
00101 threads[i]->finished.reset();
00102 if(!threads[i]->status) result=false;
00103 }
00104 ODINLOG(odinlog,normalDebug) << "finished.wait() done" << STD_endl;
00105 }
00106
00107 return result;
00108 #endif
00109 }
00110
00111