llqueuedthread.cpp

Go to the documentation of this file.
00001 
00031 #include "linden_common.h"
00032 #include "llqueuedthread.h"
00033 #include "llstl.h"
00034 
00035 //============================================================================
00036 
00037 // MAIN THREAD
00038 LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded) :
00039         LLThread(name),
00040         mThreaded(threaded),
00041         mIdleThread(TRUE),
00042         mNextHandle(0)
00043 {
00044         if (mThreaded)
00045         {
00046                 start();
00047         }
00048 }
00049 
00050 // MAIN THREAD
00051 LLQueuedThread::~LLQueuedThread()
00052 {
00053         shutdown();
00054         // ~LLThread() will be called here
00055 }
00056 
00057 void LLQueuedThread::shutdown()
00058 {
00059         setQuitting();
00060 
00061         unpause(); // MAIN THREAD
00062         if (mThreaded)
00063         {
00064                 S32 timeout = 100;
00065                 for ( ; timeout>0; timeout--)
00066                 {
00067                         if (isStopped())
00068                         {
00069                                 break;
00070                         }
00071                         ms_sleep(100);
00072                         LLThread::yield();
00073                 }
00074                 if (timeout == 0)
00075                 {
00076                         llwarns << "~LLQueuedThread (" << mName << ") timed out!" << llendl;
00077                 }
00078         }
00079         else
00080         {
00081                 mStatus = STOPPED;
00082         }
00083 
00084         QueuedRequest* req;
00085         S32 active_count = 0;
00086         while ( (req = (QueuedRequest*)mRequestHash.pop_element()) )
00087         {
00088                 if (req->getStatus() == STATUS_QUEUED || req->getStatus() == STATUS_INPROGRESS)
00089                 {
00090                         ++active_count;
00091                 }
00092                 req->deleteRequest();
00093         }
00094         if (active_count)
00095         {
00096                 llwarns << "~LLQueuedThread() called with active requests: " << active_count << llendl;
00097         }
00098 }
00099 
00100 //----------------------------------------------------------------------------
00101 
00102 // MAIN THREAD
00103 // virtual
00104 S32 LLQueuedThread::update(U32 max_time_ms)
00105 {
00106         return updateQueue(max_time_ms);
00107 }
00108 
00109 S32 LLQueuedThread::updateQueue(U32 max_time_ms)
00110 {
00111         F64 max_time = (F64)max_time_ms * .001;
00112         LLTimer timer;
00113         S32 pending = 1;
00114 
00115         // Frame Update
00116         if (mThreaded)
00117         {
00118                 pending = getPending();
00119                 unpause();
00120         }
00121         else
00122         {
00123                 while (pending > 0)
00124                 {
00125                         pending = processNextRequest();
00126                         if (max_time && timer.getElapsedTimeF64() > max_time)
00127                                 break;
00128                 }
00129         }
00130         return pending;
00131 }
00132 
00133 void LLQueuedThread::incQueue()
00134 {
00135         // Something has been added to the queue
00136         if (!isPaused())
00137         {
00138                 if (mThreaded)
00139                 {
00140                         wake(); // Wake the thread up if necessary.
00141                 }
00142         }
00143 }
00144 
00145 //virtual
00146 // May be called from any thread
00147 S32 LLQueuedThread::getPending()
00148 {
00149         S32 res;
00150         lockData();
00151         res = mRequestQueue.size();
00152         unlockData();
00153         return res;
00154 }
00155 
00156 // MAIN thread
00157 void LLQueuedThread::waitOnPending()
00158 {
00159         while(1)
00160         {
00161                 update(0);
00162 
00163                 if (mIdleThread)
00164                 {
00165                         break;
00166                 }
00167                 if (mThreaded)
00168                 {
00169                         yield();
00170                 }
00171         }
00172         return;
00173 }
00174 
00175 // MAIN thread
00176 void LLQueuedThread::printQueueStats()
00177 {
00178         lockData();
00179         if (!mRequestQueue.empty())
00180         {
00181                 QueuedRequest *req = *mRequestQueue.begin();
00182                 llinfos << llformat("Pending Requests:%d Current status:%d", mRequestQueue.size(), req->getStatus()) << llendl;
00183         }
00184         else
00185         {
00186                 llinfos << "Queued Thread Idle" << llendl;
00187         }
00188         unlockData();
00189 }
00190 
00191 // MAIN thread
00192 LLQueuedThread::handle_t LLQueuedThread::generateHandle()
00193 {
00194         lockData();
00195         while ((mNextHandle == nullHandle()) || (mRequestHash.find(mNextHandle)))
00196         {
00197                 mNextHandle++;
00198         }
00199         unlockData();
00200         return mNextHandle++;
00201 }
00202 
00203 // MAIN thread
00204 bool LLQueuedThread::addRequest(QueuedRequest* req)
00205 {
00206         if (mStatus == QUITTING)
00207         {
00208                 return false;
00209         }
00210         
00211         lockData();
00212         req->setStatus(STATUS_QUEUED);
00213         mRequestQueue.insert(req);
00214         mRequestHash.insert(req);
00215 #if _DEBUG
00216 //      llinfos << llformat("LLQueuedThread::Added req [%08d]",handle) << llendl;
00217 #endif
00218         unlockData();
00219 
00220         incQueue();
00221 
00222         return true;
00223 }
00224 
00225 // MAIN thread
00226 bool LLQueuedThread::waitForResult(LLQueuedThread::handle_t handle, bool auto_complete)
00227 {
00228         llassert (handle != nullHandle())
00229         bool res = false;
00230         bool waspaused = isPaused();
00231         bool done = false;
00232         while(!done)
00233         {
00234                 update(0); // unpauses
00235                 lockData();
00236                 QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
00237                 if (!req)
00238                 {
00239                         done = true; // request does not exist
00240                 }
00241                 else if (req->getStatus() == STATUS_COMPLETE)
00242                 {
00243                         res = true;
00244                         if (auto_complete)
00245                         {
00246                                 mRequestHash.erase(handle);
00247                                 req->deleteRequest();
00248 //                              check();
00249                         }
00250                         done = true;
00251                 }
00252                 unlockData();
00253                 
00254                 if (!done && mThreaded)
00255                 {
00256                         yield();
00257                 }
00258         }
00259         if (waspaused)
00260         {
00261                 pause();
00262         }
00263         return res;
00264 }
00265 
00266 // MAIN thread
00267 LLQueuedThread::QueuedRequest* LLQueuedThread::getRequest(handle_t handle)
00268 {
00269         if (handle == nullHandle())
00270         {
00271                 return 0;
00272         }
00273         lockData();
00274         QueuedRequest* res = (QueuedRequest*)mRequestHash.find(handle);
00275         unlockData();
00276         return res;
00277 }
00278 
00279 LLQueuedThread::status_t LLQueuedThread::getRequestStatus(handle_t handle)
00280 {
00281         status_t res = STATUS_EXPIRED;
00282         lockData();
00283         QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
00284         if (req)
00285         {
00286                 res = req->getStatus();
00287         }
00288         unlockData();
00289         return res;
00290 }
00291 
00292 void LLQueuedThread::abortRequest(handle_t handle, bool autocomplete)
00293 {
00294         lockData();
00295         QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
00296         if (req)
00297         {
00298                 req->setFlags(FLAG_ABORT | (autocomplete ? FLAG_AUTO_COMPLETE : 0));
00299         }
00300         unlockData();
00301 }
00302 
00303 // MAIN thread
00304 void LLQueuedThread::setFlags(handle_t handle, U32 flags)
00305 {
00306         lockData();
00307         QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
00308         if (req)
00309         {
00310                 req->setFlags(flags);
00311         }
00312         unlockData();
00313 }
00314 
00315 void LLQueuedThread::setPriority(handle_t handle, U32 priority)
00316 {
00317         lockData();
00318         QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
00319         if (req)
00320         {
00321                 if(req->getStatus() == STATUS_INPROGRESS)
00322                 {
00323                         // not in list
00324                         req->setPriority(priority);
00325                 }
00326                 else if(req->getStatus() == STATUS_QUEUED)
00327                 {
00328                         // remove from list then re-insert
00329                         llverify(mRequestQueue.erase(req) == 1);
00330                         req->setPriority(priority);
00331                         mRequestQueue.insert(req);
00332                 }
00333         }
00334         unlockData();
00335 }
00336 
00337 bool LLQueuedThread::completeRequest(handle_t handle)
00338 {
00339         bool res = false;
00340         lockData();
00341         QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
00342         if (req)
00343         {
00344                 llassert_always(req->getStatus() != STATUS_QUEUED);
00345                 llassert_always(req->getStatus() != STATUS_INPROGRESS);
00346 #if _DEBUG
00347 //              llinfos << llformat("LLQueuedThread::Completed req [%08d]",handle) << llendl;
00348 #endif
00349                 mRequestHash.erase(handle);
00350                 req->deleteRequest();
00351 //              check();
00352                 res = true;
00353         }
00354         unlockData();
00355         return res;
00356 }
00357 
00358 bool LLQueuedThread::check()
00359 {
00360 #if 0 // not a reliable check once mNextHandle wraps, just for quick and dirty debugging
00361         for (int i=0; i<REQUEST_HASH_SIZE; i++)
00362         {
00363                 LLSimpleHashEntry<handle_t>* entry = mRequestHash.get_element_at_index(i);
00364                 while (entry)
00365                 {
00366                         if (entry->getHashKey() > mNextHandle)
00367                         {
00368                                 llerrs << "Hash Error" << llendl;
00369                                 return false;
00370                         }
00371                         entry = entry->getNextEntry();
00372                 }
00373         }
00374 #endif
00375         return true;
00376 }               
00377         
00378 //============================================================================
00379 // Runs on its OWN thread
00380 
00381 S32 LLQueuedThread::processNextRequest()
00382 {
00383         QueuedRequest *req;
00384         // Get next request from pool
00385         lockData();
00386         while(1)
00387         {
00388                 req = NULL;
00389                 if (mRequestQueue.empty())
00390                 {
00391                         break;
00392                 }
00393                 req = *mRequestQueue.begin();
00394                 mRequestQueue.erase(mRequestQueue.begin());
00395                 if ((req->getFlags() & FLAG_ABORT) || (mStatus == QUITTING))
00396                 {
00397                         req->setStatus(STATUS_ABORTED);
00398                         req->finishRequest(false);
00399                         if (req->getFlags() & FLAG_AUTO_COMPLETE)
00400                         {
00401                                 mRequestHash.erase(req);
00402                                 req->deleteRequest();
00403 //                              check();
00404                         }
00405                         continue;
00406                 }
00407                 llassert_always(req->getStatus() == STATUS_QUEUED);
00408                 break;
00409         }
00410         if (req)
00411         {
00412                 req->setStatus(STATUS_INPROGRESS);
00413         }
00414         unlockData();
00415 
00416         // This is the only place we will call req->setStatus() after
00417         // it has initially been seet to STATUS_QUEUED, so it is
00418         // safe to access req.
00419         if (req)
00420         {
00421                 // process request
00422                 bool complete = req->processRequest();
00423 
00424                 if (complete)
00425                 {
00426                         lockData();
00427                         req->setStatus(STATUS_COMPLETE);
00428                         req->finishRequest(true);
00429                         if (req->getFlags() & FLAG_AUTO_COMPLETE)
00430                         {
00431                                 mRequestHash.erase(req);
00432                                 req->deleteRequest();
00433 //                              check();
00434                         }
00435                         unlockData();
00436                 }
00437                 else
00438                 {
00439                         lockData();
00440                         req->setStatus(STATUS_QUEUED);
00441                         mRequestQueue.insert(req);
00442                         U32 priority = req->getPriority();
00443                         unlockData();
00444                         if (priority < PRIORITY_NORMAL)
00445                         {
00446                                 ms_sleep(1); // sleep the thread a little
00447                         }
00448                 }
00449         }
00450 
00451         S32 res;
00452         S32 pending = getPending();
00453         if (pending == 0)
00454         {
00455                 if (isQuitting())
00456                 {
00457                         res = -1; // exit thread
00458                 }
00459                 else
00460                 {
00461                         res = 0;
00462                 }
00463         }
00464         else
00465         {
00466                 res = pending;
00467         }
00468         return res;
00469 }
00470 
00471 bool LLQueuedThread::runCondition()
00472 {
00473         // mRunCondition must be locked here
00474         if (mRequestQueue.empty() && mIdleThread)
00475                 return false;
00476         else
00477                 return true;
00478 }
00479 
00480 void LLQueuedThread::run()
00481 {
00482         while (1)
00483         {
00484                 // this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state.
00485                 checkPause();
00486                 
00487                 if(isQuitting())
00488                         break;
00489 
00490                 //llinfos << "QUEUED THREAD RUNNING, queue size = " << mRequestQueue.size() << llendl;
00491 
00492                 mIdleThread = FALSE;
00493                 
00494                 int res = processNextRequest();
00495                 if (res == 0)
00496                 {
00497                         mIdleThread = TRUE;
00498                 }
00499                 
00500                 if (res < 0) // finished working and want to exit
00501                 {
00502                         break;
00503                 }
00504 
00505                 //LLThread::yield(); // thread should yield after each request          
00506         }
00507 
00508         llinfos << "QUEUED THREAD " << mName << " EXITING." << llendl;
00509 }
00510 
00511 //============================================================================
00512 
00513 LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U32 priority, U32 flags) :
00514         LLSimpleHashEntry<LLQueuedThread::handle_t>(handle),
00515         mStatus(STATUS_UNKNOWN),
00516         mPriority(priority),
00517         mFlags(flags)
00518 {
00519 }
00520 
00521 LLQueuedThread::QueuedRequest::~QueuedRequest()
00522 {
00523         llassert_always(mStatus == STATUS_DELETE);
00524 }
00525 
00526 //virtual
00527 void LLQueuedThread::QueuedRequest::finishRequest(bool completed)
00528 {
00529 }
00530 
00531 //virtual
00532 void LLQueuedThread::QueuedRequest::deleteRequest()
00533 {
00534         llassert_always(mStatus != STATUS_INPROGRESS);
00535         setStatus(STATUS_DELETE);
00536         delete this;
00537 }

Generated on Thu Jul 1 06:09:04 2010 for Second Life Viewer by  doxygen 1.4.7