00001
00031 #include "linden_common.h"
00032 #include "llworkerthread.h"
00033 #include "llstl.h"
00034
00035 #if USE_FRAME_CALLBACK_MANAGER
00036 #include "llframecallbackmanager.h"
00037 #endif
00038
00039
00040
00041
00042 LLWorkerThread::LLWorkerThread(const std::string& name, bool threaded) :
00043 LLQueuedThread(name, threaded),
00044 mWorkerAPRPoolp(NULL)
00045 {
00046 apr_pool_create(&mWorkerAPRPoolp, NULL);
00047 mDeleteMutex = new LLMutex(getAPRPool());
00048 }
00049
00050 LLWorkerThread::~LLWorkerThread()
00051 {
00052
00053 if (!mDeleteList.empty())
00054 {
00055 llwarns << "Worker Thread: " << mName << " destroyed with " << mDeleteList.size()
00056 << " entries in delete list." << llendl;
00057 }
00058
00059 delete mDeleteMutex;
00060
00061
00062 }
00063
00064
00065 S32 LLWorkerThread::update(U32 max_time_ms)
00066 {
00067 S32 res = LLQueuedThread::update(max_time_ms);
00068
00069 std::vector<LLWorkerClass*> delete_list;
00070 std::vector<LLWorkerClass*> abort_list;
00071 mDeleteMutex->lock();
00072 for (delete_list_t::iterator iter = mDeleteList.begin();
00073 iter != mDeleteList.end(); )
00074 {
00075 delete_list_t::iterator curiter = iter++;
00076 LLWorkerClass* worker = *curiter;
00077 if (worker->deleteOK())
00078 {
00079 if (worker->getFlags(LLWorkerClass::WCF_WORK_FINISHED))
00080 {
00081 delete_list.push_back(worker);
00082 mDeleteList.erase(curiter);
00083 }
00084 else if (!worker->getFlags(LLWorkerClass::WCF_ABORT_REQUESTED))
00085 {
00086 abort_list.push_back(worker);
00087 }
00088 }
00089 }
00090 mDeleteMutex->unlock();
00091
00092 for (std::vector<LLWorkerClass*>::iterator iter = abort_list.begin();
00093 iter != abort_list.end(); ++iter)
00094 {
00095 (*iter)->abortWork(false);
00096 }
00097 for (std::vector<LLWorkerClass*>::iterator iter = delete_list.begin();
00098 iter != delete_list.end(); ++iter)
00099 {
00100 LLWorkerClass* worker = *iter;
00101 if (worker->mRequestHandle)
00102 {
00103
00104 completeRequest(worker->mRequestHandle);
00105 worker->mRequestHandle = LLWorkerThread::nullHandle();
00106 worker->clearFlags(LLWorkerClass::WCF_HAVE_WORK);
00107 }
00108 delete *iter;
00109 }
00110
00111 res += delete_list.size() + abort_list.size();
00112 return res;
00113 }
00114
00115
00116
00117 LLWorkerThread::handle_t LLWorkerThread::addWorkRequest(LLWorkerClass* workerclass, S32 param, U32 priority)
00118 {
00119 handle_t handle = generateHandle();
00120
00121 WorkRequest* req = new WorkRequest(handle, priority, workerclass, param);
00122
00123 bool res = addRequest(req);
00124 if (!res)
00125 {
00126 llerrs << "add called after LLWorkerThread::cleanupClass()" << llendl;
00127 req->deleteRequest();
00128 handle = nullHandle();
00129 }
00130
00131 return handle;
00132 }
00133
00134 void LLWorkerThread::deleteWorker(LLWorkerClass* workerclass)
00135 {
00136 mDeleteMutex->lock();
00137 mDeleteList.push_back(workerclass);
00138 mDeleteMutex->unlock();
00139 }
00140
00141
00142
00143
00144 LLWorkerThread::WorkRequest::WorkRequest(handle_t handle, U32 priority, LLWorkerClass* workerclass, S32 param) :
00145 LLQueuedThread::QueuedRequest(handle, priority),
00146 mWorkerClass(workerclass),
00147 mParam(param)
00148 {
00149 }
00150
00151 LLWorkerThread::WorkRequest::~WorkRequest()
00152 {
00153 }
00154
00155
00156 void LLWorkerThread::WorkRequest::deleteRequest()
00157 {
00158 LLQueuedThread::QueuedRequest::deleteRequest();
00159 }
00160
00161
00162 bool LLWorkerThread::WorkRequest::processRequest()
00163 {
00164 LLWorkerClass* workerclass = getWorkerClass();
00165 workerclass->setWorking(true);
00166 bool complete = workerclass->doWork(getParam());
00167 workerclass->setWorking(false);
00168 return complete;
00169 }
00170
00171
00172 void LLWorkerThread::WorkRequest::finishRequest(bool completed)
00173 {
00174 LLWorkerClass* workerclass = getWorkerClass();
00175 workerclass->finishWork(getParam(), completed);
00176 U32 flags = LLWorkerClass::WCF_WORK_FINISHED | (completed ? 0 : LLWorkerClass::WCF_WORK_ABORTED);
00177 workerclass->setFlags(flags);
00178 }
00179
00180
00181
00182
00183 LLWorkerClass::LLWorkerClass(LLWorkerThread* workerthread, const std::string& name)
00184 : mWorkerThread(workerthread),
00185 mWorkerClassName(name),
00186 mRequestHandle(LLWorkerThread::nullHandle()),
00187 mMutex(workerthread->getWorkerAPRPool()),
00188 mWorkFlags(0)
00189 {
00190 if (!mWorkerThread)
00191 {
00192 llerrs << "LLWorkerClass() called with NULL workerthread: " << name << llendl;
00193 }
00194 }
00195
00196 LLWorkerClass::~LLWorkerClass()
00197 {
00198 llassert_always(!(mWorkFlags & WCF_WORKING));
00199 llassert_always(mWorkFlags & WCF_DELETE_REQUESTED);
00200 if (mRequestHandle != LLWorkerThread::nullHandle())
00201 {
00202 LLWorkerThread::WorkRequest* workreq = (LLWorkerThread::WorkRequest*)mWorkerThread->getRequest(mRequestHandle);
00203 if (!workreq)
00204 {
00205 llerrs << "LLWorkerClass destroyed with stale work handle" << llendl;
00206 }
00207 if (workreq->getStatus() != LLWorkerThread::STATUS_ABORTED &&
00208 workreq->getStatus() != LLWorkerThread::STATUS_COMPLETE)
00209 {
00210 llerrs << "LLWorkerClass destroyed with active worker! Worker Status: " << workreq->getStatus() << llendl;
00211 }
00212 }
00213 }
00214
00215 void LLWorkerClass::setWorkerThread(LLWorkerThread* workerthread)
00216 {
00217 mMutex.lock();
00218 if (mRequestHandle != LLWorkerThread::nullHandle())
00219 {
00220 llerrs << "LLWorkerClass attempt to change WorkerThread with active worker!" << llendl;
00221 }
00222 mWorkerThread = workerthread;
00223 mMutex.unlock();
00224 }
00225
00226
00227
00228
00229 void LLWorkerClass::finishWork(S32 param, bool success)
00230 {
00231 }
00232
00233
00234 bool LLWorkerClass::deleteOK()
00235 {
00236 return true;
00237 }
00238
00239
00240
00241
00242 void LLWorkerClass::setWorking(bool working)
00243 {
00244 mMutex.lock();
00245 if (working)
00246 {
00247 llassert_always(!(mWorkFlags & WCF_WORKING));
00248 setFlags(WCF_WORKING);
00249 }
00250 else
00251 {
00252 llassert_always((mWorkFlags & WCF_WORKING));
00253 clearFlags(WCF_WORKING);
00254 }
00255 mMutex.unlock();
00256 }
00257
00258
00259
00260 bool LLWorkerClass::yield()
00261 {
00262 LLThread::yield();
00263 mWorkerThread->checkPause();
00264 bool res;
00265 mMutex.lock();
00266 res = (getFlags() & WCF_ABORT_REQUESTED) ? true : false;
00267 mMutex.unlock();
00268 return res;
00269 }
00270
00271
00272
00273
00274 void LLWorkerClass::addWork(S32 param, U32 priority)
00275 {
00276 mMutex.lock();
00277 llassert_always(!(mWorkFlags & (WCF_WORKING|WCF_HAVE_WORK)));
00278 if (mRequestHandle != LLWorkerThread::nullHandle())
00279 {
00280 llerrs << "LLWorkerClass attempt to add work with active worker!" << llendl;
00281 }
00282 #if _DEBUG
00283
00284 #endif
00285 startWork(param);
00286 clearFlags(WCF_WORK_FINISHED|WCF_WORK_ABORTED);
00287 setFlags(WCF_HAVE_WORK);
00288 mRequestHandle = mWorkerThread->addWorkRequest(this, param, priority);
00289 mMutex.unlock();
00290 }
00291
00292 void LLWorkerClass::abortWork(bool autocomplete)
00293 {
00294 mMutex.lock();
00295 #if _DEBUG
00296
00297
00298
00299 #endif
00300 if (mRequestHandle != LLWorkerThread::nullHandle())
00301 {
00302 mWorkerThread->abortRequest(mRequestHandle, autocomplete);
00303 mWorkerThread->setPriority(mRequestHandle, LLQueuedThread::PRIORITY_IMMEDIATE);
00304 setFlags(WCF_ABORT_REQUESTED);
00305 }
00306 mMutex.unlock();
00307 }
00308
00309
00310 bool LLWorkerClass::checkWork(bool aborting)
00311 {
00312 LLMutexLock lock(&mMutex);
00313 bool complete = false, abort = false;
00314 if (mRequestHandle != LLWorkerThread::nullHandle())
00315 {
00316 LLWorkerThread::WorkRequest* workreq = (LLWorkerThread::WorkRequest*)mWorkerThread->getRequest(mRequestHandle);
00317 llassert_always(workreq);
00318 LLQueuedThread::status_t status = workreq->getStatus();
00319 if (status == LLWorkerThread::STATUS_ABORTED)
00320 {
00321 complete = true;
00322 abort = true;
00323 }
00324 else if (status == LLWorkerThread::STATUS_COMPLETE)
00325 {
00326 complete = true;
00327 }
00328 else
00329 {
00330 llassert_always(!aborting || (workreq->getFlags() & LLQueuedThread::FLAG_ABORT));
00331 }
00332 if (complete)
00333 {
00334 llassert_always(!(getFlags(WCF_WORKING)));
00335 endWork(workreq->getParam(), abort);
00336 mWorkerThread->completeRequest(mRequestHandle);
00337 mRequestHandle = LLWorkerThread::nullHandle();
00338 clearFlags(WCF_HAVE_WORK);
00339 }
00340 }
00341 else
00342 {
00343 complete = true;
00344 }
00345 return complete;
00346 }
00347
00348 void LLWorkerClass::scheduleDelete()
00349 {
00350 bool do_delete = false;
00351 mMutex.lock();
00352 if (!(getFlags(WCF_DELETE_REQUESTED)))
00353 {
00354 setFlags(WCF_DELETE_REQUESTED);
00355 do_delete = true;
00356 }
00357 mMutex.unlock();
00358 if (do_delete)
00359 {
00360 mWorkerThread->deleteWorker(this);
00361 }
00362 }
00363
00364 void LLWorkerClass::setPriority(U32 priority)
00365 {
00366 mMutex.lock();
00367 if (mRequestHandle != LLWorkerThread::nullHandle())
00368 {
00369 mRequestPriority = priority;
00370 mWorkerThread->setPriority(mRequestHandle, priority);
00371 }
00372 mMutex.unlock();
00373 }
00374
00375
00376