00001
00031 #include "linden_common.h"
00032 #include "llqueuedthread.h"
00033 #include "llstl.h"
00034
00035
00036
00037
00038 LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded) :
00039 LLThread(name),
00040 mThreaded(threaded),
00041 mIdleThread(TRUE),
00042 mNextHandle(0)
00043 {
00044 if (mThreaded)
00045 {
00046 start();
00047 }
00048 }
00049
00050
00051 LLQueuedThread::~LLQueuedThread()
00052 {
00053 shutdown();
00054
00055 }
00056
00057 void LLQueuedThread::shutdown()
00058 {
00059 setQuitting();
00060
00061 unpause();
00062 if (mThreaded)
00063 {
00064 S32 timeout = 100;
00065 for ( ; timeout>0; timeout--)
00066 {
00067 if (isStopped())
00068 {
00069 break;
00070 }
00071 ms_sleep(100);
00072 LLThread::yield();
00073 }
00074 if (timeout == 0)
00075 {
00076 llwarns << "~LLQueuedThread (" << mName << ") timed out!" << llendl;
00077 }
00078 }
00079 else
00080 {
00081 mStatus = STOPPED;
00082 }
00083
00084 QueuedRequest* req;
00085 S32 active_count = 0;
00086 while ( (req = (QueuedRequest*)mRequestHash.pop_element()) )
00087 {
00088 if (req->getStatus() == STATUS_QUEUED || req->getStatus() == STATUS_INPROGRESS)
00089 {
00090 ++active_count;
00091 }
00092 req->deleteRequest();
00093 }
00094 if (active_count)
00095 {
00096 llwarns << "~LLQueuedThread() called with active requests: " << active_count << llendl;
00097 }
00098 }
00099
00100
00101
00102
00103
00104 S32 LLQueuedThread::update(U32 max_time_ms)
00105 {
00106 return updateQueue(max_time_ms);
00107 }
00108
00109 S32 LLQueuedThread::updateQueue(U32 max_time_ms)
00110 {
00111 F64 max_time = (F64)max_time_ms * .001;
00112 LLTimer timer;
00113 S32 pending = 1;
00114
00115
00116 if (mThreaded)
00117 {
00118 pending = getPending();
00119 unpause();
00120 }
00121 else
00122 {
00123 while (pending > 0)
00124 {
00125 pending = processNextRequest();
00126 if (max_time && timer.getElapsedTimeF64() > max_time)
00127 break;
00128 }
00129 }
00130 return pending;
00131 }
00132
00133 void LLQueuedThread::incQueue()
00134 {
00135
00136 if (!isPaused())
00137 {
00138 if (mThreaded)
00139 {
00140 wake();
00141 }
00142 }
00143 }
00144
00145
00146
00147 S32 LLQueuedThread::getPending()
00148 {
00149 S32 res;
00150 lockData();
00151 res = mRequestQueue.size();
00152 unlockData();
00153 return res;
00154 }
00155
00156
00157 void LLQueuedThread::waitOnPending()
00158 {
00159 while(1)
00160 {
00161 update(0);
00162
00163 if (mIdleThread)
00164 {
00165 break;
00166 }
00167 if (mThreaded)
00168 {
00169 yield();
00170 }
00171 }
00172 return;
00173 }
00174
00175
00176 void LLQueuedThread::printQueueStats()
00177 {
00178 lockData();
00179 if (!mRequestQueue.empty())
00180 {
00181 QueuedRequest *req = *mRequestQueue.begin();
00182 llinfos << llformat("Pending Requests:%d Current status:%d", mRequestQueue.size(), req->getStatus()) << llendl;
00183 }
00184 else
00185 {
00186 llinfos << "Queued Thread Idle" << llendl;
00187 }
00188 unlockData();
00189 }
00190
00191
00192 LLQueuedThread::handle_t LLQueuedThread::generateHandle()
00193 {
00194 lockData();
00195 while ((mNextHandle == nullHandle()) || (mRequestHash.find(mNextHandle)))
00196 {
00197 mNextHandle++;
00198 }
00199 unlockData();
00200 return mNextHandle++;
00201 }
00202
00203
00204 bool LLQueuedThread::addRequest(QueuedRequest* req)
00205 {
00206 if (mStatus == QUITTING)
00207 {
00208 return false;
00209 }
00210
00211 lockData();
00212 req->setStatus(STATUS_QUEUED);
00213 mRequestQueue.insert(req);
00214 mRequestHash.insert(req);
00215 #if _DEBUG
00216
00217 #endif
00218 unlockData();
00219
00220 incQueue();
00221
00222 return true;
00223 }
00224
00225
00226 bool LLQueuedThread::waitForResult(LLQueuedThread::handle_t handle, bool auto_complete)
00227 {
00228 llassert (handle != nullHandle())
00229 bool res = false;
00230 bool waspaused = isPaused();
00231 bool done = false;
00232 while(!done)
00233 {
00234 update(0);
00235 lockData();
00236 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
00237 if (!req)
00238 {
00239 done = true;
00240 }
00241 else if (req->getStatus() == STATUS_COMPLETE)
00242 {
00243 res = true;
00244 if (auto_complete)
00245 {
00246 mRequestHash.erase(handle);
00247 req->deleteRequest();
00248
00249 }
00250 done = true;
00251 }
00252 unlockData();
00253
00254 if (!done && mThreaded)
00255 {
00256 yield();
00257 }
00258 }
00259 if (waspaused)
00260 {
00261 pause();
00262 }
00263 return res;
00264 }
00265
00266
00267 LLQueuedThread::QueuedRequest* LLQueuedThread::getRequest(handle_t handle)
00268 {
00269 if (handle == nullHandle())
00270 {
00271 return 0;
00272 }
00273 lockData();
00274 QueuedRequest* res = (QueuedRequest*)mRequestHash.find(handle);
00275 unlockData();
00276 return res;
00277 }
00278
00279 LLQueuedThread::status_t LLQueuedThread::getRequestStatus(handle_t handle)
00280 {
00281 status_t res = STATUS_EXPIRED;
00282 lockData();
00283 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
00284 if (req)
00285 {
00286 res = req->getStatus();
00287 }
00288 unlockData();
00289 return res;
00290 }
00291
00292 void LLQueuedThread::abortRequest(handle_t handle, bool autocomplete)
00293 {
00294 lockData();
00295 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
00296 if (req)
00297 {
00298 req->setFlags(FLAG_ABORT | (autocomplete ? FLAG_AUTO_COMPLETE : 0));
00299 }
00300 unlockData();
00301 }
00302
00303
00304 void LLQueuedThread::setFlags(handle_t handle, U32 flags)
00305 {
00306 lockData();
00307 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
00308 if (req)
00309 {
00310 req->setFlags(flags);
00311 }
00312 unlockData();
00313 }
00314
00315 void LLQueuedThread::setPriority(handle_t handle, U32 priority)
00316 {
00317 lockData();
00318 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
00319 if (req)
00320 {
00321 if(req->getStatus() == STATUS_INPROGRESS)
00322 {
00323
00324 req->setPriority(priority);
00325 }
00326 else if(req->getStatus() == STATUS_QUEUED)
00327 {
00328
00329 llverify(mRequestQueue.erase(req) == 1);
00330 req->setPriority(priority);
00331 mRequestQueue.insert(req);
00332 }
00333 }
00334 unlockData();
00335 }
00336
00337 bool LLQueuedThread::completeRequest(handle_t handle)
00338 {
00339 bool res = false;
00340 lockData();
00341 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
00342 if (req)
00343 {
00344 llassert_always(req->getStatus() != STATUS_QUEUED);
00345 llassert_always(req->getStatus() != STATUS_INPROGRESS);
00346 #if _DEBUG
00347
00348 #endif
00349 mRequestHash.erase(handle);
00350 req->deleteRequest();
00351
00352 res = true;
00353 }
00354 unlockData();
00355 return res;
00356 }
00357
00358 bool LLQueuedThread::check()
00359 {
00360 #if 0 // not a reliable check once mNextHandle wraps, just for quick and dirty debugging
00361 for (int i=0; i<REQUEST_HASH_SIZE; i++)
00362 {
00363 LLSimpleHashEntry<handle_t>* entry = mRequestHash.get_element_at_index(i);
00364 while (entry)
00365 {
00366 if (entry->getHashKey() > mNextHandle)
00367 {
00368 llerrs << "Hash Error" << llendl;
00369 return false;
00370 }
00371 entry = entry->getNextEntry();
00372 }
00373 }
00374 #endif
00375 return true;
00376 }
00377
00378
00379
00380
00381 S32 LLQueuedThread::processNextRequest()
00382 {
00383 QueuedRequest *req;
00384
00385 lockData();
00386 while(1)
00387 {
00388 req = NULL;
00389 if (mRequestQueue.empty())
00390 {
00391 break;
00392 }
00393 req = *mRequestQueue.begin();
00394 mRequestQueue.erase(mRequestQueue.begin());
00395 if ((req->getFlags() & FLAG_ABORT) || (mStatus == QUITTING))
00396 {
00397 req->setStatus(STATUS_ABORTED);
00398 req->finishRequest(false);
00399 if (req->getFlags() & FLAG_AUTO_COMPLETE)
00400 {
00401 mRequestHash.erase(req);
00402 req->deleteRequest();
00403
00404 }
00405 continue;
00406 }
00407 llassert_always(req->getStatus() == STATUS_QUEUED);
00408 break;
00409 }
00410 if (req)
00411 {
00412 req->setStatus(STATUS_INPROGRESS);
00413 }
00414 unlockData();
00415
00416
00417
00418
00419 if (req)
00420 {
00421
00422 bool complete = req->processRequest();
00423
00424 if (complete)
00425 {
00426 lockData();
00427 req->setStatus(STATUS_COMPLETE);
00428 req->finishRequest(true);
00429 if (req->getFlags() & FLAG_AUTO_COMPLETE)
00430 {
00431 mRequestHash.erase(req);
00432 req->deleteRequest();
00433
00434 }
00435 unlockData();
00436 }
00437 else
00438 {
00439 lockData();
00440 req->setStatus(STATUS_QUEUED);
00441 mRequestQueue.insert(req);
00442 U32 priority = req->getPriority();
00443 unlockData();
00444 if (priority < PRIORITY_NORMAL)
00445 {
00446 ms_sleep(1);
00447 }
00448 }
00449 }
00450
00451 S32 res;
00452 S32 pending = getPending();
00453 if (pending == 0)
00454 {
00455 if (isQuitting())
00456 {
00457 res = -1;
00458 }
00459 else
00460 {
00461 res = 0;
00462 }
00463 }
00464 else
00465 {
00466 res = pending;
00467 }
00468 return res;
00469 }
00470
00471 bool LLQueuedThread::runCondition()
00472 {
00473
00474 if (mRequestQueue.empty() && mIdleThread)
00475 return false;
00476 else
00477 return true;
00478 }
00479
00480 void LLQueuedThread::run()
00481 {
00482 while (1)
00483 {
00484
00485 checkPause();
00486
00487 if(isQuitting())
00488 break;
00489
00490
00491
00492 mIdleThread = FALSE;
00493
00494 int res = processNextRequest();
00495 if (res == 0)
00496 {
00497 mIdleThread = TRUE;
00498 }
00499
00500 if (res < 0)
00501 {
00502 break;
00503 }
00504
00505
00506 }
00507
00508 llinfos << "QUEUED THREAD " << mName << " EXITING." << llendl;
00509 }
00510
00511
00512
00513 LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U32 priority, U32 flags) :
00514 LLSimpleHashEntry<LLQueuedThread::handle_t>(handle),
00515 mStatus(STATUS_UNKNOWN),
00516 mPriority(priority),
00517 mFlags(flags)
00518 {
00519 }
00520
00521 LLQueuedThread::QueuedRequest::~QueuedRequest()
00522 {
00523 llassert_always(mStatus == STATUS_DELETE);
00524 }
00525
00526
00527 void LLQueuedThread::QueuedRequest::finishRequest(bool completed)
00528 {
00529 }
00530
00531
00532 void LLQueuedThread::QueuedRequest::deleteRequest()
00533 {
00534 llassert_always(mStatus != STATUS_INPROGRESS);
00535 setStatus(STATUS_DELETE);
00536 delete this;
00537 }