llpumpio.cpp

Go to the documentation of this file.
00001 
00034 #include "linden_common.h"
00035 #include "llpumpio.h"
00036 
00037 #include <set>
00038 #include "apr-1/apr_poll.h"
00039 
00040 #include "llapr.h"
00041 #include "llmemtype.h"
00042 #include "llstl.h"
00043 
00044 // This should not be in production, but it is intensely useful during
00045 // development.
00046 #if LL_LINUX
00047 #define LL_DEBUG_PIPE_TYPE_IN_PUMP 0
00048 #endif
00049 
00050 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00051 #include <typeinfo>
00052 #endif
00053 
00054 // constants for poll timeout. if we are threading, we want to have a
00055 // longer poll timeout.
00056 #if LL_THREADS_APR
00057 static const S32 DEFAULT_POLL_TIMEOUT = 1000;
00058 #else
00059 static const S32 DEFAULT_POLL_TIMEOUT = 0;
00060 #endif
00061 
00062 // The default (and fallback) expiration time for chains
00063 const F32 DEFAULT_CHAIN_EXPIRY_SECS = 30.0f;
00064 extern const F32 SHORT_CHAIN_EXPIRY_SECS = 1.0f;
00065 extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f;
00066 
00067 // sorta spammy debug modes.
00068 //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR 1
00069 //#define LL_DEBUG_PROCESS_LINK 1
00070 //#define LL_DEBUG_PROCESS_RETURN_VALUE 1
00071 
00072 // Super spammy debug mode.
00073 //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1
00074 //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1
00075 
00079 class LLChainSleeper : public LLRunnable
00080 {
00081 public:
00082         static LLRunner::run_ptr_t build(LLPumpIO* pump, S32 key)
00083         {
00084                 return LLRunner::run_ptr_t(new LLChainSleeper(pump, key));
00085         }
00086         
00087         virtual void run(LLRunner* runner, S64 handle)
00088         {
00089                 mPump->clearLock(mKey);
00090         }
00091 
00092 protected:
00093         LLChainSleeper(LLPumpIO* pump, S32 key) : mPump(pump), mKey(key) {}
00094         LLPumpIO* mPump;
00095         S32 mKey;
00096 };
00097 
00098 
00103 struct ll_delete_apr_pollset_fd_client_data
00104 {
00105         typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t;
00106         void operator()(const pipe_conditional_t& conditional)
00107         {
00108                 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00109                 S32* client_id = (S32*)conditional.second.client_data;
00110                 delete client_id;
00111         }
00112 };
00113 
00117 LLPumpIO::LLPumpIO(apr_pool_t* pool) :
00118         mState(LLPumpIO::NORMAL),
00119         mRebuildPollset(false),
00120         mPollset(NULL),
00121         mPollsetClientID(0),
00122         mNextLock(0),
00123         mPool(NULL),
00124         mCurrentPool(NULL),
00125         mCurrentPoolReallocCount(0),
00126         mChainsMutex(NULL),
00127         mCallbackMutex(NULL)
00128 {
00129         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00130         initialize(pool);
00131 }
00132 
00133 LLPumpIO::~LLPumpIO()
00134 {
00135         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00136         cleanup();
00137 }
00138 
00139 bool LLPumpIO::prime(apr_pool_t* pool)
00140 {
00141         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00142         cleanup();
00143         initialize(pool);
00144         return ((pool == NULL) ? false : true);
00145 }
00146 
00147 bool LLPumpIO::addChain(const chain_t& chain, F32 timeout)
00148 {
00149         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00150         if(chain.empty()) return false;
00151 
00152 #if LL_THREADS_APR
00153         LLScopedLock lock(mChainsMutex);
00154 #endif
00155         LLChainInfo info;
00156         info.setTimeoutSeconds(timeout);
00157         info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray);
00158         LLLinkInfo link;
00159 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00160         lldebugs << "LLPumpIO::addChain() " << chain[0] << " '"
00161                 << typeid(*(chain[0])).name() << "'" << llendl;
00162 #else
00163         lldebugs << "LLPumpIO::addChain() " << chain[0] <<llendl;
00164 #endif
00165         chain_t::const_iterator it = chain.begin();
00166         chain_t::const_iterator end = chain.end();
00167         for(; it != end; ++it)
00168         {
00169                 link.mPipe = (*it);
00170                 link.mChannels = info.mData->nextChannel();
00171                 info.mChainLinks.push_back(link);
00172         }
00173         mPendingChains.push_back(info);
00174         return true;
00175 }
00176 
00177 bool LLPumpIO::addChain(
00178         const LLPumpIO::links_t& links,
00179         LLIOPipe::buffer_ptr_t data,
00180         LLSD context,
00181         F32 timeout)
00182 {
00183         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00184 
00185         // remember that if the caller is providing a full link
00186         // description, we need to have that description matched to a
00187         // particular buffer.
00188         if(!data) return false;
00189         if(links.empty()) return false;
00190 
00191 #if LL_THREADS_APR
00192         LLScopedLock lock(mChainsMutex);
00193 #endif
00194 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00195         lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << " '"
00196                 << typeid(*(links[0].mPipe)).name() << "'" << llendl;
00197 #else
00198         lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << llendl;
00199 #endif
00200         LLChainInfo info;
00201         info.setTimeoutSeconds(timeout);
00202         info.mChainLinks = links;
00203         info.mData = data;
00204         info.mContext = context;
00205         mPendingChains.push_back(info);
00206         return true;
00207 }
00208 
00209 bool LLPumpIO::setTimeoutSeconds(F32 timeout)
00210 {
00211         // If no chain is running, return failure.
00212         if(current_chain_t() == mCurrentChain)
00213         {
00214                 return false;
00215         }
00216         (*mCurrentChain).setTimeoutSeconds(timeout);
00217         return true;
00218 }
00219 
00220 bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll)
00221 {
00222         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00223         //lldebugs << "LLPumpIO::setConditional" << llendl;
00224         if(pipe)
00225         {
00226                 // remove any matching poll file descriptors for this pipe.
00227                 LLIOPipe::ptr_t pipe_ptr(pipe);
00228                 LLChainInfo::conditionals_t::iterator it;
00229                 it = (*mCurrentChain).mDescriptors.begin();
00230                 while(it != (*mCurrentChain).mDescriptors.end())
00231                 {
00232                         LLChainInfo::pipe_conditional_t& value = (*it);
00233                         if(pipe_ptr == value.first)
00234                         {
00235                                 ll_delete_apr_pollset_fd_client_data()(value);
00236                                 it = (*mCurrentChain).mDescriptors.erase(it);
00237                                 mRebuildPollset = true;
00238                         }
00239                         else
00240                         {
00241                                 ++it;
00242                         }
00243                 }
00244 
00245                 if(poll)
00246                 {
00247                         LLChainInfo::pipe_conditional_t value;
00248                         value.first = pipe_ptr;
00249                         value.second = *poll;
00250                         if(!poll->p)
00251                         {
00252                                 // each fd needs a pool to work with, so if one was
00253                                 // not specified, use this pool.
00254                                 // *FIX: Should it always be this pool?
00255                                 value.second.p = mPool;
00256                         }
00257                         value.second.client_data = new S32(++mPollsetClientID);
00258                         (*mCurrentChain).mDescriptors.push_back(value);
00259                         mRebuildPollset = true;
00260                 }
00261                 return true;
00262         }
00263         return false;
00264 }
00265 
00266 S32 LLPumpIO::setLock()
00267 {
00268         // *NOTE: I do not think it is necessary to acquire a mutex here
00269         // since this should only be called during the pump(), and should
00270         // only change the running chain. Any other use of this method is
00271         // incorrect usage. If it becomes necessary to acquire a lock
00272         // here, be sure to lock here and call a protected method to get
00273         // the lock, and sleepChain() should probably acquire the same
00274         // lock while and calling the same protected implementation to
00275         // lock the runner at the same time.
00276 
00277         // If no chain is running, return failure.
00278         if(current_chain_t() == mCurrentChain)
00279         {
00280                 return 0;
00281         }
00282 
00283         // deal with wrap.
00284         if(++mNextLock <= 0)
00285         {
00286                 mNextLock = 1;
00287         }
00288 
00289         // set the lock
00290         (*mCurrentChain).mLock = mNextLock;
00291         return mNextLock;
00292 }
00293 
00294 void LLPumpIO::clearLock(S32 key)
00295 {
00296         // We need to lock it here since we do not want to be iterating
00297         // over the chains twice. We can safely call process() while this
00298         // is happening since we should not be erasing a locked pipe, and
00299         // therefore won't be treading into deleted memory. I think we can
00300         // also clear the lock on the chain safely since the pump only
00301         // reads that value.
00302 #if LL_THREADS_APR
00303         LLScopedLock lock(mChainsMutex);
00304 #endif
00305         mClearLocks.insert(key);
00306 }
00307 
00308 bool LLPumpIO::sleepChain(F64 seconds)
00309 {
00310         // Much like the call to setLock(), this should only be called
00311         // from one chain during processing, so there is no need to
00312         // acquire a mutex.
00313         if(seconds <= 0.0) return false;
00314         S32 key = setLock();
00315         if(!key) return false;
00316         LLRunner::run_handle_t handle = mRunner.addRunnable(
00317                 LLChainSleeper::build(this, key),
00318                 LLRunner::RUN_IN,
00319                 seconds);
00320         if(0 == handle) return false;
00321         return true;
00322 }
00323 
00324 bool LLPumpIO::copyCurrentLinkInfo(links_t& links) const
00325 {
00326         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00327         if(current_chain_t() == mCurrentChain)
00328         {
00329                 return false;
00330         }
00331         std::copy(
00332                 (*mCurrentChain).mChainLinks.begin(),
00333                 (*mCurrentChain).mChainLinks.end(),
00334                 std::back_insert_iterator<links_t>(links));
00335         return true;
00336 }
00337 
00338 void LLPumpIO::pump()
00339 {
00340         pump(DEFAULT_POLL_TIMEOUT);
00341 }
00342 
00343 //timeout is in microseconds
00344 void LLPumpIO::pump(const S32& poll_timeout)
00345 {
00346         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00347         LLFastTimer t1(LLFastTimer::FTM_PUMP);
00348         //llinfos << "LLPumpIO::pump()" << llendl;
00349 
00350         // Run any pending runners.
00351         mRunner.run();
00352 
00353         // We need to move all of the pending heads over to the running
00354         // chains.
00355         PUMP_DEBUG;
00356         if(true)
00357         {
00358 #if LL_THREADS_APR
00359                 LLScopedLock lock(mChainsMutex);
00360 #endif
00361                 // bail if this pump is paused.
00362                 if(PAUSING == mState)
00363                 {
00364                         mState = PAUSED;
00365                 }
00366                 if(PAUSED == mState)
00367                 {
00368                         return;
00369                 }
00370 
00371                 PUMP_DEBUG;
00372                 // Move the pending chains over to the running chaings
00373                 if(!mPendingChains.empty())
00374                 {
00375                         PUMP_DEBUG;
00376                         //lldebugs << "Pushing " << mPendingChains.size() << "." << llendl;
00377                         std::copy(
00378                                 mPendingChains.begin(),
00379                                 mPendingChains.end(),
00380                                 std::back_insert_iterator<running_chains_t>(mRunningChains));
00381                         mPendingChains.clear();
00382                         PUMP_DEBUG;
00383                 }
00384 
00385                 // Clear any locks. This needs to be done here so that we do
00386                 // not clash during a call to clearLock().
00387                 if(!mClearLocks.empty())
00388                 {
00389                         PUMP_DEBUG;
00390                         running_chains_t::iterator it = mRunningChains.begin();
00391                         running_chains_t::iterator end = mRunningChains.end();
00392                         std::set<S32>::iterator not_cleared = mClearLocks.end();
00393                         for(; it != end; ++it)
00394                         {
00395                                 if((*it).mLock && mClearLocks.find((*it).mLock) != not_cleared)
00396                                 {
00397                                         (*it).mLock = 0;
00398                                 }
00399                         }
00400                         PUMP_DEBUG;
00401                         mClearLocks.clear();
00402                 }
00403         }
00404 
00405         PUMP_DEBUG;
00406         // rebuild the pollset if necessary
00407         if(mRebuildPollset)
00408         {
00409                 PUMP_DEBUG;
00410                 rebuildPollset();
00411                 mRebuildPollset = false;
00412         }
00413 
00414         // Poll based on the last known pollset
00415         // *FIX: may want to pass in a poll timeout so it works correctly
00416         // in single and multi threaded processes.
00417         PUMP_DEBUG;
00418         typedef std::set<S32> signal_client_t;
00419         signal_client_t signalled_client;
00420         if(mPollset)
00421         {
00422                 PUMP_DEBUG;
00423                 //llinfos << "polling" << llendl;
00424                 S32 count = 0;
00425                 S32 client_id = 0;
00426                 const apr_pollfd_t* poll_fd = NULL;
00427                 apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd);
00428                 PUMP_DEBUG;
00429                 for(S32 i = 0; i < count; ++i)
00430                 {
00431                         client_id = *((S32*)poll_fd[i].client_data);
00432                         signalled_client.insert(client_id);
00433                 }
00434                 PUMP_DEBUG;
00435         }
00436 
00437         PUMP_DEBUG;
00438         // set up for a check to see if each one was signalled
00439         signal_client_t::iterator not_signalled = signalled_client.end();
00440 
00441         // Process everything as appropriate
00442         //lldebugs << "Running chain count: " << mRunningChains.size() << llendl;
00443         running_chains_t::iterator run_chain = mRunningChains.begin();
00444         bool process_this_chain = false;
00445         for(; run_chain != mRunningChains.end(); )
00446         {
00447                 PUMP_DEBUG;
00448                 if((*run_chain).mInit
00449                    && (*run_chain).mTimer.getStarted()
00450                    && (*run_chain).mTimer.hasExpired())
00451                 {
00452                         PUMP_DEBUG;
00453                         if(handleChainError(*run_chain, LLIOPipe::STATUS_EXPIRED))
00454                         {
00455                                 // the pipe probably handled the error. If the handler
00456                                 // forgot to reset the expiration then we need to do
00457                                 // that here.
00458                                 if((*run_chain).mTimer.getStarted()
00459                                    && (*run_chain).mTimer.hasExpired())
00460                                 {
00461                                         PUMP_DEBUG;
00462                                         llinfos << "Error handler forgot to reset timeout. "
00463                                                         << "Resetting to " << DEFAULT_CHAIN_EXPIRY_SECS
00464                                                         << " seconds." << llendl;
00465                                         (*run_chain).setTimeoutSeconds(DEFAULT_CHAIN_EXPIRY_SECS);
00466                                 }
00467                         }
00468                         else
00469                         {
00470                                 PUMP_DEBUG;
00471                                 // it timed out and no one handled it, so we need to
00472                                 // retire the chain
00473 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00474                                 lldebugs << "Removing chain "
00475                                                 << (*run_chain).mChainLinks[0].mPipe
00476                                                 << " '"
00477                                                 << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
00478                                                 << "' because it timed out." << llendl;
00479 #else
00480 //                              lldebugs << "Removing chain "
00481 //                                              << (*run_chain).mChainLinks[0].mPipe
00482 //                                              << " because we reached the end." << llendl;
00483 #endif
00484                                 run_chain = mRunningChains.erase(run_chain);
00485                                 continue;
00486                         }
00487                 }
00488                 PUMP_DEBUG;
00489                 if((*run_chain).mLock)
00490                 {
00491                         ++run_chain;
00492                         continue;
00493                 }
00494                 PUMP_DEBUG;
00495                 mCurrentChain = run_chain;
00496                 if((*run_chain).mDescriptors.empty())
00497                 {
00498                         // if there are no conditionals, just process this chain.
00499                         process_this_chain = true;
00500                         //lldebugs << "no conditionals - processing" << llendl;
00501                 }
00502                 else
00503                 {
00504                         PUMP_DEBUG;
00505                         //lldebugs << "checking conditionals" << llendl;
00506                         // Check if this run chain was signalled. If any file
00507                         // descriptor is ready for something, then go ahead and
00508                         // process this chian.
00509                         process_this_chain = false;
00510                         if(!signalled_client.empty())
00511                         {
00512                                 PUMP_DEBUG;
00513                                 LLChainInfo::conditionals_t::iterator it;
00514                                 it = (*run_chain).mDescriptors.begin();
00515                                 LLChainInfo::conditionals_t::iterator end;
00516                                 end = (*run_chain).mDescriptors.end();
00517                                 S32 client_id = 0;
00518                                 for(; it != end; ++it)
00519                                 {
00520                                         PUMP_DEBUG;
00521                                         client_id = *((S32*)((*it).second.client_data));
00522                                         if(signalled_client.find(client_id) != not_signalled)
00523                                         {
00524                                                 process_this_chain = true;
00525                                                 break;
00526                                         }
00527                                         //llinfos << "no fd ready for this one." << llendl;
00528                                 }
00529                         }
00530                 }
00531                 if(process_this_chain)
00532                 {
00533                         PUMP_DEBUG;
00534                         if(!((*run_chain).mInit))
00535                         {
00536                                 (*run_chain).mHead = (*run_chain).mChainLinks.begin();
00537                                 (*run_chain).mInit = true;
00538                         }
00539                         PUMP_DEBUG;
00540                         processChain(*run_chain);
00541                 }
00542 
00543                 PUMP_DEBUG;
00544                 if((*run_chain).mHead == (*run_chain).mChainLinks.end())
00545                 {
00546 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00547                         lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
00548                                         << " '"
00549                                         << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
00550                                         << "' because we reached the end." << llendl;
00551 #else
00552 //                      lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
00553 //                                      << " because we reached the end." << llendl;
00554 #endif
00555 
00556                         PUMP_DEBUG;
00557                         // This chain is done. Clean up any allocated memory and
00558                         // erase the chain info.
00559                         std::for_each(
00560                                 (*run_chain).mDescriptors.begin(),
00561                                 (*run_chain).mDescriptors.end(),
00562                                 ll_delete_apr_pollset_fd_client_data());
00563                         run_chain = mRunningChains.erase(run_chain);
00564 
00565                         // *NOTE: may not always need to rebuild the pollset.
00566                         mRebuildPollset = true;
00567                 }
00568                 else
00569                 {
00570                         PUMP_DEBUG;
00571                         // this chain needs more processing - just go to the next
00572                         // chain.
00573                         ++run_chain;
00574                 }
00575         }
00576 
00577         PUMP_DEBUG;
00578         // null out the chain
00579         mCurrentChain = current_chain_t();
00580         END_PUMP_DEBUG;
00581 }
00582 
00583 //bool LLPumpIO::respond(const chain_t& pipes)
00584 //{
00585 //#if LL_THREADS_APR
00586 //      LLScopedLock lock(mCallbackMutex);
00587 //#endif
00588 //      LLChainInfo info;
00589 //      links_t links;
00590 //      
00591 //      mPendingCallbacks.push_back(info);
00592 //      return true;
00593 //}
00594 
00595 bool LLPumpIO::respond(LLIOPipe* pipe)
00596 {
00597         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00598         if(NULL == pipe) return false;
00599 
00600 #if LL_THREADS_APR
00601         LLScopedLock lock(mCallbackMutex);
00602 #endif
00603         LLChainInfo info;
00604         LLLinkInfo link;
00605         link.mPipe = pipe;
00606         info.mChainLinks.push_back(link);
00607         mPendingCallbacks.push_back(info);
00608         return true;
00609 }
00610 
00611 bool LLPumpIO::respond(
00612         const links_t& links,
00613         LLIOPipe::buffer_ptr_t data,
00614         LLSD context)
00615 {
00616         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00617         // if the caller is providing a full link description, we need to
00618         // have that description matched to a particular buffer.
00619         if(!data) return false;
00620         if(links.empty()) return false;
00621 
00622 #if LL_THREADS_APR
00623         LLScopedLock lock(mCallbackMutex);
00624 #endif
00625 
00626         // Add the callback response
00627         LLChainInfo info;
00628         info.mChainLinks = links;
00629         info.mData = data;
00630         info.mContext = context;
00631         mPendingCallbacks.push_back(info);
00632         return true;
00633 }
00634 
00635 void LLPumpIO::callback()
00636 {
00637         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00638         //llinfos << "LLPumpIO::callback()" << llendl;
00639         if(true)
00640         {
00641 #if LL_THREADS_APR
00642                 LLScopedLock lock(mCallbackMutex);
00643 #endif
00644                 std::copy(
00645                         mPendingCallbacks.begin(),
00646                         mPendingCallbacks.end(),
00647                         std::back_insert_iterator<callbacks_t>(mCallbacks));
00648                 mPendingCallbacks.clear();
00649         }
00650         if(!mCallbacks.empty())
00651         {
00652                 callbacks_t::iterator it = mCallbacks.begin();
00653                 callbacks_t::iterator end = mCallbacks.end();
00654                 for(; it != end; ++it)
00655                 {
00656                         // it's always the first and last time for respone chains
00657                         (*it).mHead = (*it).mChainLinks.begin();
00658                         (*it).mInit = true;
00659                         (*it).mEOS = true;
00660                         processChain(*it);
00661                 }
00662                 mCallbacks.clear();
00663         }
00664 }
00665 
00666 void LLPumpIO::control(LLPumpIO::EControl op)
00667 {
00668 #if LL_THREADS_APR
00669         LLScopedLock lock(mChainsMutex);
00670 #endif
00671         switch(op)
00672         {
00673         case PAUSE:
00674                 mState = PAUSING;
00675                 break;
00676         case RESUME:
00677                 mState = NORMAL;
00678                 break;
00679         default:
00680                 // no-op
00681                 break;
00682         }
00683 }
00684 
00685 void LLPumpIO::initialize(apr_pool_t* pool)
00686 {
00687         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00688         if(!pool) return;
00689 #if LL_THREADS_APR
00690         // SJB: Windows defaults to NESTED and OSX defaults to UNNESTED, so use UNNESTED explicitly.
00691         apr_thread_mutex_create(&mChainsMutex, APR_THREAD_MUTEX_UNNESTED, pool);
00692         apr_thread_mutex_create(&mCallbackMutex, APR_THREAD_MUTEX_UNNESTED, pool);
00693 #endif
00694         mPool = pool;
00695 }
00696 
00697 void LLPumpIO::cleanup()
00698 {
00699         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00700 #if LL_THREADS_APR
00701         if(mChainsMutex) apr_thread_mutex_destroy(mChainsMutex);
00702         if(mCallbackMutex) apr_thread_mutex_destroy(mCallbackMutex);
00703 #endif
00704         mChainsMutex = NULL;
00705         mCallbackMutex = NULL;
00706         if(mPollset)
00707         {
00708 //              lldebugs << "cleaning up pollset" << llendl;
00709                 apr_pollset_destroy(mPollset);
00710                 mPollset = NULL;
00711         }
00712         if(mCurrentPool)
00713         {
00714                 apr_pool_destroy(mCurrentPool);
00715                 mCurrentPool = NULL;
00716         }
00717         mPool = NULL;
00718 }
00719 
00720 void LLPumpIO::rebuildPollset()
00721 {
00722         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00723 //      lldebugs << "LLPumpIO::rebuildPollset()" << llendl;
00724         if(mPollset)
00725         {
00726                 //lldebugs << "destroying pollset" << llendl;
00727                 apr_pollset_destroy(mPollset);
00728                 mPollset = NULL;
00729         }
00730         U32 size = 0;
00731         running_chains_t::iterator run_it = mRunningChains.begin();
00732         running_chains_t::iterator run_end = mRunningChains.end();
00733         for(; run_it != run_end; ++run_it)
00734         {
00735                 size += (*run_it).mDescriptors.size();
00736         }
00737         //lldebugs << "found " << size << " descriptors." << llendl;
00738         if(size)
00739         {
00740                 // Recycle the memory pool
00741                 const S32 POLLSET_POOL_RECYCLE_COUNT = 100;
00742                 if(mCurrentPool
00743                    && (0 == (++mCurrentPoolReallocCount % POLLSET_POOL_RECYCLE_COUNT)))
00744                 {
00745                         apr_pool_destroy(mCurrentPool);
00746                         mCurrentPool = NULL;
00747                         mCurrentPoolReallocCount = 0;
00748                 }
00749                 if(!mCurrentPool)
00750                 {
00751                         apr_status_t status = apr_pool_create(&mCurrentPool, mPool);
00752                         (void)ll_apr_warn_status(status);
00753                 }
00754 
00755                 // add all of the file descriptors
00756                 run_it = mRunningChains.begin();
00757                 LLChainInfo::conditionals_t::iterator fd_it;
00758                 LLChainInfo::conditionals_t::iterator fd_end;
00759                 apr_pollset_create(&mPollset, size, mCurrentPool, 0);
00760                 for(; run_it != run_end; ++run_it)
00761                 {
00762                         fd_it = (*run_it).mDescriptors.begin();
00763                         fd_end = (*run_it).mDescriptors.end();
00764                         for(; fd_it != fd_end; ++fd_it)
00765                         {
00766                                 apr_pollset_add(mPollset, &((*fd_it).second));
00767                         }
00768                 }
00769         }
00770 }
00771 
00772 void LLPumpIO::processChain(LLChainInfo& chain)
00773 {
00774         PUMP_DEBUG;
00775         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00776         LLIOPipe::EStatus status = LLIOPipe::STATUS_OK;
00777         links_t::iterator it = chain.mHead;
00778         links_t::iterator end = chain.mChainLinks.end();
00779         bool need_process_signaled = false;
00780         bool keep_going = true;
00781         do
00782         {
00783 #if LL_DEBUG_PROCESS_LINK
00784 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00785                 llinfos << "Processing " << typeid(*((*it).mPipe)).name() << "."
00786                                 << llendl;
00787 #else
00788                 llinfos << "Processing link " << (*it).mPipe << "." << llendl;
00789 #endif
00790 #endif
00791 #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN
00792                 if(chain.mData)
00793                 {
00794                         char* buf = NULL;
00795                         S32 bytes = chain.mData->countAfter((*it).mChannels.in(), NULL);
00796                         if(bytes)
00797                         {
00798                                 buf = new char[bytes + 1];
00799                                 chain.mData->readAfter(
00800                                         (*it).mChannels.in(),
00801                                         NULL,
00802                                         (U8*)buf,
00803                                         bytes);
00804                                 buf[bytes] = '\0';
00805                                 llinfos << "CHANNEL IN(" << (*it).mChannels.in() << "): "
00806                                                 << buf << llendl;
00807                                 delete[] buf;
00808                                 buf = NULL;
00809                         }
00810                         else
00811                         {
00812                                 llinfos << "CHANNEL IN(" << (*it).mChannels.in()<< "): (null)"
00813                                                 << llendl;
00814                         }
00815                 }
00816 #endif
00817                 PUMP_DEBUG;
00818                 status = (*it).mPipe->process(
00819                         (*it).mChannels,
00820                         chain.mData,
00821                         chain.mEOS,
00822                         chain.mContext,
00823                         this);
00824 #if LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT
00825                 if(chain.mData)
00826                 {
00827                         char* buf = NULL;
00828                         S32 bytes = chain.mData->countAfter((*it).mChannels.out(), NULL);
00829                         if(bytes)
00830                         {
00831                                 buf = new char[bytes + 1];
00832                                 chain.mData->readAfter(
00833                                         (*it).mChannels.out(),
00834                                         NULL,
00835                                         (U8*)buf,
00836                                         bytes);
00837                                 buf[bytes] = '\0';
00838                                 llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): "
00839                                                 << buf << llendl;
00840                                 delete[] buf;
00841                                 buf = NULL;
00842                         }
00843                         else
00844                         {
00845                                 llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): (null)"
00846                                                 << llendl;
00847                         }
00848                 }
00849 #endif
00850 
00851 #if LL_DEBUG_PROCESS_RETURN_VALUE
00852                 // Only bother with the success codes - error codes are logged
00853                 // below.
00854                 if(LLIOPipe::isSuccess(status))
00855                 {
00856                         llinfos << "Pipe returned: '"
00857 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00858                                         << typeid(*((*it).mPipe)).name() << "':'"
00859 #endif
00860                                         << LLIOPipe::lookupStatusString(status) << "'" << llendl;
00861                 }
00862 #endif
00863 
00864                 PUMP_DEBUG;
00865                 switch(status)
00866                 {
00867                 case LLIOPipe::STATUS_OK:
00868                         // no-op
00869                         break;
00870                 case LLIOPipe::STATUS_STOP:
00871                         PUMP_DEBUG;
00872                         status = LLIOPipe::STATUS_OK;
00873                         chain.mHead = end;
00874                         keep_going = false;
00875                         break;
00876                 case LLIOPipe::STATUS_DONE:
00877                         PUMP_DEBUG;
00878                         status = LLIOPipe::STATUS_OK;
00879                         chain.mHead = (it + 1);
00880                         chain.mEOS = true;
00881                         break;
00882                 case LLIOPipe::STATUS_BREAK:
00883                         PUMP_DEBUG;
00884                         status = LLIOPipe::STATUS_OK;
00885                         keep_going = false;
00886                         break;
00887                 case LLIOPipe::STATUS_NEED_PROCESS:
00888                         PUMP_DEBUG;
00889                         status = LLIOPipe::STATUS_OK;
00890                         if(!need_process_signaled)
00891                         {
00892                                 need_process_signaled = true;
00893                                 chain.mHead = it;
00894                         }
00895                         break;
00896                 default:
00897                         PUMP_DEBUG;
00898                         if(LLIOPipe::isError(status))
00899                         {
00900                                 llinfos << "Pump generated pipe err: '"
00901 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00902                                                 << typeid(*((*it).mPipe)).name() << "':'"
00903 #endif
00904                                                 << LLIOPipe::lookupStatusString(status)
00905                                                 << "'" << llendl;
00906 #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR
00907                                 if(chain.mData)
00908                                 {
00909                                         char* buf = NULL;
00910                                         S32 bytes = chain.mData->countAfter(
00911                                                 (*it).mChannels.in(),
00912                                                 NULL);
00913                                         if(bytes)
00914                                         {
00915                                                 buf = new char[bytes + 1];
00916                                                 chain.mData->readAfter(
00917                                                         (*it).mChannels.in(),
00918                                                         NULL,
00919                                                         (U8*)buf,
00920                                                         bytes);
00921                                                 buf[bytes] = '\0';
00922                                                 llinfos << "Input After Error: " << buf << llendl;
00923                                                 delete[] buf;
00924                                                 buf = NULL;
00925                                         }
00926                                         else
00927                                         {
00928                                                 llinfos << "Input After Error: (null)" << llendl;
00929                                         }
00930                                 }
00931                                 else
00932                                 {
00933                                         llinfos << "Input After Error: (null)" << llendl;
00934                                 }
00935 #endif
00936                                 keep_going = false;
00937                                 chain.mHead  = it;
00938                                 if(!handleChainError(chain, status))
00939                                 {
00940                                         chain.mHead = end;
00941                                 }
00942                         }
00943                         else
00944                         {
00945                                 llinfos << "Unhandled status code: " << status << ":"
00946                                                 << LLIOPipe::lookupStatusString(status) << llendl;
00947                         }
00948                         break;
00949                 }
00950                 PUMP_DEBUG;
00951         } while(keep_going && (++it != end));
00952         PUMP_DEBUG;
00953 }
00954 
00955 bool LLPumpIO::handleChainError(
00956         LLChainInfo& chain,
00957         LLIOPipe::EStatus error)
00958 {
00959         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00960         links_t::reverse_iterator rit;
00961         if(chain.mHead == chain.mChainLinks.end())
00962         {
00963                 rit = links_t::reverse_iterator(chain.mHead);
00964         }
00965         else
00966         {
00967                 rit = links_t::reverse_iterator(chain.mHead + 1);
00968         }
00969         
00970         links_t::reverse_iterator rend = chain.mChainLinks.rend();
00971         bool handled = false;
00972         bool keep_going = true;
00973         do
00974         {
00975 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00976                 lldebugs << "Passing error to " << typeid(*((*rit).mPipe)).name()
00977                                  << "." << llendl;
00978 #endif
00979                 error = (*rit).mPipe->handleError(error, this);
00980                 switch(error)
00981                 {
00982                 case LLIOPipe::STATUS_OK:
00983                         handled = true;
00984                         chain.mHead = rit.base();
00985                         break;
00986                 case LLIOPipe::STATUS_STOP:
00987                 case LLIOPipe::STATUS_DONE:
00988                 case LLIOPipe::STATUS_BREAK:
00989                 case LLIOPipe::STATUS_NEED_PROCESS:
00990 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00991                         lldebugs << "Pipe " << typeid(*((*rit).mPipe)).name()
00992                                          << " returned code to stop error handler." << llendl;
00993 #endif
00994                         keep_going = false;
00995                         break;
00996                 default:
00997                         if(LLIOPipe::isSuccess(error))
00998                         {
00999                                 llinfos << "Unhandled status code: " << error << ":"
01000                                                 << LLIOPipe::lookupStatusString(error) << llendl;
01001                                 error = LLIOPipe::STATUS_ERROR;
01002                                 keep_going = false;
01003                         }
01004                         break;
01005                 }
01006         } while(keep_going && !handled && (++rit != rend));
01007         return handled;
01008 }
01009 
01014 LLPumpIO::LLChainInfo::LLChainInfo() :
01015         mInit(false),
01016         mLock(0),
01017         mEOS(false)
01018 {
01019         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
01020         mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS);
01021 }
01022 
01023 void LLPumpIO::LLChainInfo::setTimeoutSeconds(F32 timeout)
01024 {
01025         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
01026         if(timeout > 0.0f)
01027         {
01028                 mTimer.start();
01029                 mTimer.reset();
01030                 mTimer.setTimerExpirySec(timeout);
01031         }
01032         else
01033         {
01034                 mTimer.stop();
01035         }
01036 }

Generated on Thu Jul 1 06:09:03 2010 for Second Life Viewer by  doxygen 1.4.7