llpumpio.cpp

Go to the documentation of this file.
00001 
00034 #include "linden_common.h"
00035 #include "llpumpio.h"
00036 
00037 #include <map>
00038 #include <set>
00039 #include "apr-1/apr_poll.h"
00040 
00041 #include "llapr.h"
00042 #include "llmemtype.h"
00043 #include "llstl.h"
00044 
00045 // These should not be enabled in production, but they can be
00046 // intensely useful during development for finding certain kinds of
00047 // bugs.
00048 #if LL_LINUX
00049 //#define LL_DEBUG_PIPE_TYPE_IN_PUMP 1
00050 //#define LL_DEBUG_POLL_FILE_DESCRIPTORS 1
00051 #if LL_DEBUG_POLL_FILE_DESCRIPTORS
00052 #include "apr-1/apr_portable.h"
00053 #endif
00054 #endif
00055 
00056 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00057 #include <typeinfo>
00058 #endif
00059 
00060 // constants for poll timeout. if we are threading, we want to have a
00061 // longer poll timeout.
00062 #if LL_THREADS_APR
00063 static const S32 DEFAULT_POLL_TIMEOUT = 1000;
00064 #else
00065 static const S32 DEFAULT_POLL_TIMEOUT = 0;
00066 #endif
00067 
00068 // The default (and fallback) expiration time for chains
00069 const F32 DEFAULT_CHAIN_EXPIRY_SECS = 30.0f;
00070 extern const F32 SHORT_CHAIN_EXPIRY_SECS = 1.0f;
00071 extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f;
00072 
00073 // sorta spammy debug modes.
00074 //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR 1
00075 //#define LL_DEBUG_PROCESS_LINK 1
00076 //#define LL_DEBUG_PROCESS_RETURN_VALUE 1
00077 
00078 // Super spammy debug mode.
00079 //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1
00080 //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1
00081 
00082 //
00083 // local functions
00084 //
00085 void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll)
00086 {
00087 #if LL_DEBUG_POLL_FILE_DESCRIPTORS
00088         if(!poll)
00089         {
00090                 lldebugs << "Poll -- " << (msg?msg:"") << ": no pollfd." << llendl;
00091                 return;
00092         }
00093         if(poll->desc.s)
00094         {
00095                 apr_os_sock_t os_sock;
00096                 if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s))
00097                 {
00098                         lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_sock
00099                                  << " at " << poll->desc.s << llendl;
00100                 }
00101                 else
00102                 {
00103                         lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
00104                                  << " at " << poll->desc.s << llendl;
00105                 }
00106         }
00107         else if(poll->desc.f)
00108         {
00109                 apr_os_file_t os_file;
00110                 if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f))
00111                 {
00112                         lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_file
00113                                  << " at " << poll->desc.f << llendl;
00114                 }
00115                 else
00116                 {
00117                         lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
00118                                  << " at " << poll->desc.f << llendl;
00119                 }
00120         }
00121         else
00122         {
00123                 lldebugs << "Poll -- " << (msg?msg:"") << ": no descriptor." << llendl;
00124         }
00125 #endif  
00126 }
00127 
00131 class LLChainSleeper : public LLRunnable
00132 {
00133 public:
00134         static LLRunner::run_ptr_t build(LLPumpIO* pump, S32 key)
00135         {
00136                 return LLRunner::run_ptr_t(new LLChainSleeper(pump, key));
00137         }
00138         
00139         virtual void run(LLRunner* runner, S64 handle)
00140         {
00141                 mPump->clearLock(mKey);
00142         }
00143 
00144 protected:
00145         LLChainSleeper(LLPumpIO* pump, S32 key) : mPump(pump), mKey(key) {}
00146         LLPumpIO* mPump;
00147         S32 mKey;
00148 };
00149 
00150 
00155 struct ll_delete_apr_pollset_fd_client_data
00156 {
00157         typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t;
00158         void operator()(const pipe_conditional_t& conditional)
00159         {
00160                 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00161                 S32* client_id = (S32*)conditional.second.client_data;
00162                 delete client_id;
00163         }
00164 };
00165 
00169 LLPumpIO::LLPumpIO(apr_pool_t* pool) :
00170         mState(LLPumpIO::NORMAL),
00171         mRebuildPollset(false),
00172         mPollset(NULL),
00173         mPollsetClientID(0),
00174         mNextLock(0),
00175         mPool(NULL),
00176         mCurrentPool(NULL),
00177         mCurrentPoolReallocCount(0),
00178         mChainsMutex(NULL),
00179         mCallbackMutex(NULL)
00180 {
00181         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00182         initialize(pool);
00183 }
00184 
00185 LLPumpIO::~LLPumpIO()
00186 {
00187         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00188         cleanup();
00189 }
00190 
00191 bool LLPumpIO::prime(apr_pool_t* pool)
00192 {
00193         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00194         cleanup();
00195         initialize(pool);
00196         return ((pool == NULL) ? false : true);
00197 }
00198 
00199 bool LLPumpIO::addChain(const chain_t& chain, F32 timeout)
00200 {
00201         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00202         if(chain.empty()) return false;
00203 
00204 #if LL_THREADS_APR
00205         LLScopedLock lock(mChainsMutex);
00206 #endif
00207         LLChainInfo info;
00208         info.setTimeoutSeconds(timeout);
00209         info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray);
00210         LLLinkInfo link;
00211 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00212         lldebugs << "LLPumpIO::addChain() " << chain[0] << " '"
00213                 << typeid(*(chain[0])).name() << "'" << llendl;
00214 #else
00215         lldebugs << "LLPumpIO::addChain() " << chain[0] <<llendl;
00216 #endif
00217         chain_t::const_iterator it = chain.begin();
00218         chain_t::const_iterator end = chain.end();
00219         for(; it != end; ++it)
00220         {
00221                 link.mPipe = (*it);
00222                 link.mChannels = info.mData->nextChannel();
00223                 info.mChainLinks.push_back(link);
00224         }
00225         mPendingChains.push_back(info);
00226         return true;
00227 }
00228 
00229 bool LLPumpIO::addChain(
00230         const LLPumpIO::links_t& links,
00231         LLIOPipe::buffer_ptr_t data,
00232         LLSD context,
00233         F32 timeout)
00234 {
00235         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00236 
00237         // remember that if the caller is providing a full link
00238         // description, we need to have that description matched to a
00239         // particular buffer.
00240         if(!data) return false;
00241         if(links.empty()) return false;
00242 
00243 #if LL_THREADS_APR
00244         LLScopedLock lock(mChainsMutex);
00245 #endif
00246 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00247         lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << " '"
00248                 << typeid(*(links[0].mPipe)).name() << "'" << llendl;
00249 #else
00250         lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << llendl;
00251 #endif
00252         LLChainInfo info;
00253         info.setTimeoutSeconds(timeout);
00254         info.mChainLinks = links;
00255         info.mData = data;
00256         info.mContext = context;
00257         mPendingChains.push_back(info);
00258         return true;
00259 }
00260 
00261 bool LLPumpIO::setTimeoutSeconds(F32 timeout)
00262 {
00263         // If no chain is running, return failure.
00264         if(current_chain_t() == mCurrentChain)
00265         {
00266                 return false;
00267         }
00268         (*mCurrentChain).setTimeoutSeconds(timeout);
00269         return true;
00270 }
00271 
00272 static std::string events_2_string(apr_int16_t events)
00273 {
00274         std::ostringstream ostr;
00275         if(events & APR_POLLIN)
00276         {
00277                 ostr << "read,";
00278         }
00279         if(events & APR_POLLPRI)
00280         {
00281                 ostr << "priority,";
00282         }
00283         if(events & APR_POLLOUT)
00284         {
00285                 ostr << "write,";
00286         }
00287         if(events & APR_POLLERR)
00288         {
00289                 ostr << "error,";
00290         }
00291         if(events & APR_POLLHUP)
00292         {
00293                 ostr << "hangup,";
00294         }
00295         if(events & APR_POLLNVAL)
00296         {
00297                 ostr << "invalid,";
00298         }
00299         return chop_tail_copy(ostr.str(), 1);
00300 }
00301 
00302 bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll)
00303 {
00304         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00305         if(!pipe) return false;
00306         ll_debug_poll_fd("Set conditional", poll);
00307 
00308         lldebugs << "Setting conditionals (" << (poll ? events_2_string(poll->reqevents) :"null")
00309                  << ") "
00310 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00311                  << "on pipe " << typeid(*pipe).name() 
00312 #endif
00313                  << " at " << pipe << llendl;
00314 
00315         // remove any matching poll file descriptors for this pipe.
00316         LLIOPipe::ptr_t pipe_ptr(pipe);
00317         LLChainInfo::conditionals_t::iterator it;
00318         it = (*mCurrentChain).mDescriptors.begin();
00319         while(it != (*mCurrentChain).mDescriptors.end())
00320         {
00321                 LLChainInfo::pipe_conditional_t& value = (*it);
00322                 if(pipe_ptr == value.first)
00323                 {
00324                         ll_delete_apr_pollset_fd_client_data()(value);
00325                         it = (*mCurrentChain).mDescriptors.erase(it);
00326                         mRebuildPollset = true;
00327                 }
00328                 else
00329                 {
00330                         ++it;
00331                 }
00332         }
00333 
00334         if(!poll)
00335         {
00336                 mRebuildPollset = true;
00337                 return true;
00338         }
00339         LLChainInfo::pipe_conditional_t value;
00340         value.first = pipe_ptr;
00341         value.second = *poll;
00342         value.second.rtnevents = 0;
00343         if(!poll->p)
00344         {
00345                 // each fd needs a pool to work with, so if one was
00346                 // not specified, use this pool.
00347                 // *FIX: Should it always be this pool?
00348                 value.second.p = mPool;
00349         }
00350         value.second.client_data = new S32(++mPollsetClientID);
00351         (*mCurrentChain).mDescriptors.push_back(value);
00352         mRebuildPollset = true;
00353         return true;
00354 }
00355 
00356 S32 LLPumpIO::setLock()
00357 {
00358         // *NOTE: I do not think it is necessary to acquire a mutex here
00359         // since this should only be called during the pump(), and should
00360         // only change the running chain. Any other use of this method is
00361         // incorrect usage. If it becomes necessary to acquire a lock
00362         // here, be sure to lock here and call a protected method to get
00363         // the lock, and sleepChain() should probably acquire the same
00364         // lock while and calling the same protected implementation to
00365         // lock the runner at the same time.
00366 
00367         // If no chain is running, return failure.
00368         if(current_chain_t() == mCurrentChain)
00369         {
00370                 return 0;
00371         }
00372 
00373         // deal with wrap.
00374         if(++mNextLock <= 0)
00375         {
00376                 mNextLock = 1;
00377         }
00378 
00379         // set the lock
00380         (*mCurrentChain).mLock = mNextLock;
00381         return mNextLock;
00382 }
00383 
00384 void LLPumpIO::clearLock(S32 key)
00385 {
00386         // We need to lock it here since we do not want to be iterating
00387         // over the chains twice. We can safely call process() while this
00388         // is happening since we should not be erasing a locked pipe, and
00389         // therefore won't be treading into deleted memory. I think we can
00390         // also clear the lock on the chain safely since the pump only
00391         // reads that value.
00392 #if LL_THREADS_APR
00393         LLScopedLock lock(mChainsMutex);
00394 #endif
00395         mClearLocks.insert(key);
00396 }
00397 
00398 bool LLPumpIO::sleepChain(F64 seconds)
00399 {
00400         // Much like the call to setLock(), this should only be called
00401         // from one chain during processing, so there is no need to
00402         // acquire a mutex.
00403         if(seconds <= 0.0) return false;
00404         S32 key = setLock();
00405         if(!key) return false;
00406         LLRunner::run_handle_t handle = mRunner.addRunnable(
00407                 LLChainSleeper::build(this, key),
00408                 LLRunner::RUN_IN,
00409                 seconds);
00410         if(0 == handle) return false;
00411         return true;
00412 }
00413 
00414 bool LLPumpIO::copyCurrentLinkInfo(links_t& links) const
00415 {
00416         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00417         if(current_chain_t() == mCurrentChain)
00418         {
00419                 return false;
00420         }
00421         std::copy(
00422                 (*mCurrentChain).mChainLinks.begin(),
00423                 (*mCurrentChain).mChainLinks.end(),
00424                 std::back_insert_iterator<links_t>(links));
00425         return true;
00426 }
00427 
00428 void LLPumpIO::pump()
00429 {
00430         pump(DEFAULT_POLL_TIMEOUT);
00431 }
00432 
00433 //timeout is in microseconds
00434 void LLPumpIO::pump(const S32& poll_timeout)
00435 {
00436         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00437         LLFastTimer t1(LLFastTimer::FTM_PUMP);
00438         //llinfos << "LLPumpIO::pump()" << llendl;
00439 
00440         // Run any pending runners.
00441         mRunner.run();
00442 
00443         // We need to move all of the pending heads over to the running
00444         // chains.
00445         PUMP_DEBUG;
00446         if(true)
00447         {
00448 #if LL_THREADS_APR
00449                 LLScopedLock lock(mChainsMutex);
00450 #endif
00451                 // bail if this pump is paused.
00452                 if(PAUSING == mState)
00453                 {
00454                         mState = PAUSED;
00455                 }
00456                 if(PAUSED == mState)
00457                 {
00458                         return;
00459                 }
00460 
00461                 PUMP_DEBUG;
00462                 // Move the pending chains over to the running chaings
00463                 if(!mPendingChains.empty())
00464                 {
00465                         PUMP_DEBUG;
00466                         //lldebugs << "Pushing " << mPendingChains.size() << "." << llendl;
00467                         std::copy(
00468                                 mPendingChains.begin(),
00469                                 mPendingChains.end(),
00470                                 std::back_insert_iterator<running_chains_t>(mRunningChains));
00471                         mPendingChains.clear();
00472                         PUMP_DEBUG;
00473                 }
00474 
00475                 // Clear any locks. This needs to be done here so that we do
00476                 // not clash during a call to clearLock().
00477                 if(!mClearLocks.empty())
00478                 {
00479                         PUMP_DEBUG;
00480                         running_chains_t::iterator it = mRunningChains.begin();
00481                         running_chains_t::iterator end = mRunningChains.end();
00482                         std::set<S32>::iterator not_cleared = mClearLocks.end();
00483                         for(; it != end; ++it)
00484                         {
00485                                 if((*it).mLock && mClearLocks.find((*it).mLock) != not_cleared)
00486                                 {
00487                                         (*it).mLock = 0;
00488                                 }
00489                         }
00490                         PUMP_DEBUG;
00491                         mClearLocks.clear();
00492                 }
00493         }
00494 
00495         PUMP_DEBUG;
00496         // rebuild the pollset if necessary
00497         if(mRebuildPollset)
00498         {
00499                 PUMP_DEBUG;
00500                 rebuildPollset();
00501                 mRebuildPollset = false;
00502         }
00503 
00504         // Poll based on the last known pollset
00505         // *TODO: may want to pass in a poll timeout so it works correctly
00506         // in single and multi threaded processes.
00507         PUMP_DEBUG;
00508         typedef std::map<S32, S32> signal_client_t;
00509         signal_client_t signalled_client;
00510         const apr_pollfd_t* poll_fd = NULL;
00511         if(mPollset)
00512         {
00513                 PUMP_DEBUG;
00514                 //llinfos << "polling" << llendl;
00515                 S32 count = 0;
00516                 S32 client_id = 0;
00517                 apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd);
00518                 PUMP_DEBUG;
00519                 for(S32 ii = 0; ii < count; ++ii)
00520                 {
00521                         ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]);
00522                         client_id = *((S32*)poll_fd[ii].client_data);
00523                         signalled_client[client_id] = ii;
00524                 }
00525                 PUMP_DEBUG;
00526         }
00527 
00528         PUMP_DEBUG;
00529         // set up for a check to see if each one was signalled
00530         signal_client_t::iterator not_signalled = signalled_client.end();
00531 
00532         // Process everything as appropriate
00533         //lldebugs << "Running chain count: " << mRunningChains.size() << llendl;
00534         running_chains_t::iterator run_chain = mRunningChains.begin();
00535         bool process_this_chain = false;
00536         for(; run_chain != mRunningChains.end(); )
00537         {
00538                 PUMP_DEBUG;
00539                 if((*run_chain).mInit
00540                    && (*run_chain).mTimer.getStarted()
00541                    && (*run_chain).mTimer.hasExpired())
00542                 {
00543                         PUMP_DEBUG;
00544                         if(handleChainError(*run_chain, LLIOPipe::STATUS_EXPIRED))
00545                         {
00546                                 // the pipe probably handled the error. If the handler
00547                                 // forgot to reset the expiration then we need to do
00548                                 // that here.
00549                                 if((*run_chain).mTimer.getStarted()
00550                                    && (*run_chain).mTimer.hasExpired())
00551                                 {
00552                                         PUMP_DEBUG;
00553                                         llinfos << "Error handler forgot to reset timeout. "
00554                                                         << "Resetting to " << DEFAULT_CHAIN_EXPIRY_SECS
00555                                                         << " seconds." << llendl;
00556                                         (*run_chain).setTimeoutSeconds(DEFAULT_CHAIN_EXPIRY_SECS);
00557                                 }
00558                         }
00559                         else
00560                         {
00561                                 PUMP_DEBUG;
00562                                 // it timed out and no one handled it, so we need to
00563                                 // retire the chain
00564 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00565                                 lldebugs << "Removing chain "
00566                                                 << (*run_chain).mChainLinks[0].mPipe
00567                                                 << " '"
00568                                                 << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
00569                                                 << "' because it timed out." << llendl;
00570 #else
00571 //                              lldebugs << "Removing chain "
00572 //                                              << (*run_chain).mChainLinks[0].mPipe
00573 //                                              << " because we reached the end." << llendl;
00574 #endif
00575                                 run_chain = mRunningChains.erase(run_chain);
00576                                 continue;
00577                         }
00578                 }
00579                 PUMP_DEBUG;
00580                 if((*run_chain).mLock)
00581                 {
00582                         ++run_chain;
00583                         continue;
00584                 }
00585                 PUMP_DEBUG;
00586                 mCurrentChain = run_chain;
00587                 if((*run_chain).mDescriptors.empty())
00588                 {
00589                         // if there are no conditionals, just process this chain.
00590                         process_this_chain = true;
00591                         //lldebugs << "no conditionals - processing" << llendl;
00592                 }
00593                 else
00594                 {
00595                         PUMP_DEBUG;
00596                         //lldebugs << "checking conditionals" << llendl;
00597                         // Check if this run chain was signalled. If any file
00598                         // descriptor is ready for something, then go ahead and
00599                         // process this chian.
00600                         process_this_chain = false;
00601                         if(!signalled_client.empty())
00602                         {
00603                                 PUMP_DEBUG;
00604                                 LLChainInfo::conditionals_t::iterator it;
00605                                 it = (*run_chain).mDescriptors.begin();
00606                                 LLChainInfo::conditionals_t::iterator end;
00607                                 end = (*run_chain).mDescriptors.end();
00608                                 S32 client_id = 0;
00609                                 signal_client_t::iterator signal;
00610                                 for(; it != end; ++it)
00611                                 {
00612                                         PUMP_DEBUG;
00613                                         client_id = *((S32*)((*it).second.client_data));
00614                                         signal = signalled_client.find(client_id);
00615                                         if (signal == not_signalled) continue;
00616                                         static const apr_int16_t POLL_CHAIN_ERROR =
00617                                                 APR_POLLHUP | APR_POLLNVAL | APR_POLLERR;
00618                                         const apr_pollfd_t* poll = &(poll_fd[(*signal).second]);
00619                                         if(poll->rtnevents & POLL_CHAIN_ERROR)
00620                                         {
00621                                                 // Potential eror condition has been
00622                                                 // returned. If HUP was one of them, we pass
00623                                                 // that as the error even though there may be
00624                                                 // more. If there are in fact more errors,
00625                                                 // we'll just wait for that detection until
00626                                                 // the next pump() cycle to catch it so that
00627                                                 // the logic here gets no more strained than
00628                                                 // it already is.
00629                                                 LLIOPipe::EStatus error_status;
00630                                                 if(poll->rtnevents & APR_POLLHUP)
00631                                                         error_status = LLIOPipe::STATUS_LOST_CONNECTION;
00632                                                 else
00633                                                         error_status = LLIOPipe::STATUS_ERROR;
00634                                                 if(handleChainError(*run_chain, error_status)) break;
00635                                                 ll_debug_poll_fd("Removing pipe", poll);
00636                                                 llwarns << "Removing pipe "
00637                                                         << (*run_chain).mChainLinks[0].mPipe
00638                                                         << " '"
00639 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00640                                                         << typeid(
00641                                                                 *((*run_chain).mChainLinks[0].mPipe)).name()
00642 #endif
00643                                                         << "' because: "
00644                                                         << events_2_string(poll->rtnevents)
00645                                                         << llendl;
00646                                                 (*run_chain).mHead = (*run_chain).mChainLinks.end();
00647                                                 break;
00648                                         }
00649 
00650                                         // at least 1 fd got signalled, and there were no
00651                                         // errors. That means we process this chain.
00652                                         process_this_chain = true;
00653                                         break;
00654                                 }
00655                         }
00656                 }
00657                 if(process_this_chain)
00658                 {
00659                         PUMP_DEBUG;
00660                         if(!((*run_chain).mInit))
00661                         {
00662                                 (*run_chain).mHead = (*run_chain).mChainLinks.begin();
00663                                 (*run_chain).mInit = true;
00664                         }
00665                         PUMP_DEBUG;
00666                         processChain(*run_chain);
00667                 }
00668 
00669                 PUMP_DEBUG;
00670                 if((*run_chain).mHead == (*run_chain).mChainLinks.end())
00671                 {
00672 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00673                         lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
00674                                         << " '"
00675                                         << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
00676                                         << "' because we reached the end." << llendl;
00677 #else
00678 //                      lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
00679 //                                      << " because we reached the end." << llendl;
00680 #endif
00681 
00682                         PUMP_DEBUG;
00683                         // This chain is done. Clean up any allocated memory and
00684                         // erase the chain info.
00685                         std::for_each(
00686                                 (*run_chain).mDescriptors.begin(),
00687                                 (*run_chain).mDescriptors.end(),
00688                                 ll_delete_apr_pollset_fd_client_data());
00689                         run_chain = mRunningChains.erase(run_chain);
00690 
00691                         // *NOTE: may not always need to rebuild the pollset.
00692                         mRebuildPollset = true;
00693                 }
00694                 else
00695                 {
00696                         PUMP_DEBUG;
00697                         // this chain needs more processing - just go to the next
00698                         // chain.
00699                         ++run_chain;
00700                 }
00701         }
00702 
00703         PUMP_DEBUG;
00704         // null out the chain
00705         mCurrentChain = current_chain_t();
00706         END_PUMP_DEBUG;
00707 }
00708 
00709 //bool LLPumpIO::respond(const chain_t& pipes)
00710 //{
00711 //#if LL_THREADS_APR
00712 //      LLScopedLock lock(mCallbackMutex);
00713 //#endif
00714 //      LLChainInfo info;
00715 //      links_t links;
00716 //      
00717 //      mPendingCallbacks.push_back(info);
00718 //      return true;
00719 //}
00720 
00721 bool LLPumpIO::respond(LLIOPipe* pipe)
00722 {
00723         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00724         if(NULL == pipe) return false;
00725 
00726 #if LL_THREADS_APR
00727         LLScopedLock lock(mCallbackMutex);
00728 #endif
00729         LLChainInfo info;
00730         LLLinkInfo link;
00731         link.mPipe = pipe;
00732         info.mChainLinks.push_back(link);
00733         mPendingCallbacks.push_back(info);
00734         return true;
00735 }
00736 
00737 bool LLPumpIO::respond(
00738         const links_t& links,
00739         LLIOPipe::buffer_ptr_t data,
00740         LLSD context)
00741 {
00742         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00743         // if the caller is providing a full link description, we need to
00744         // have that description matched to a particular buffer.
00745         if(!data) return false;
00746         if(links.empty()) return false;
00747 
00748 #if LL_THREADS_APR
00749         LLScopedLock lock(mCallbackMutex);
00750 #endif
00751 
00752         // Add the callback response
00753         LLChainInfo info;
00754         info.mChainLinks = links;
00755         info.mData = data;
00756         info.mContext = context;
00757         mPendingCallbacks.push_back(info);
00758         return true;
00759 }
00760 
00761 void LLPumpIO::callback()
00762 {
00763         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00764         //llinfos << "LLPumpIO::callback()" << llendl;
00765         if(true)
00766         {
00767 #if LL_THREADS_APR
00768                 LLScopedLock lock(mCallbackMutex);
00769 #endif
00770                 std::copy(
00771                         mPendingCallbacks.begin(),
00772                         mPendingCallbacks.end(),
00773                         std::back_insert_iterator<callbacks_t>(mCallbacks));
00774                 mPendingCallbacks.clear();
00775         }
00776         if(!mCallbacks.empty())
00777         {
00778                 callbacks_t::iterator it = mCallbacks.begin();
00779                 callbacks_t::iterator end = mCallbacks.end();
00780                 for(; it != end; ++it)
00781                 {
00782                         // it's always the first and last time for respone chains
00783                         (*it).mHead = (*it).mChainLinks.begin();
00784                         (*it).mInit = true;
00785                         (*it).mEOS = true;
00786                         processChain(*it);
00787                 }
00788                 mCallbacks.clear();
00789         }
00790 }
00791 
00792 void LLPumpIO::control(LLPumpIO::EControl op)
00793 {
00794 #if LL_THREADS_APR
00795         LLScopedLock lock(mChainsMutex);
00796 #endif
00797         switch(op)
00798         {
00799         case PAUSE:
00800                 mState = PAUSING;
00801                 break;
00802         case RESUME:
00803                 mState = NORMAL;
00804                 break;
00805         default:
00806                 // no-op
00807                 break;
00808         }
00809 }
00810 
00811 void LLPumpIO::initialize(apr_pool_t* pool)
00812 {
00813         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00814         if(!pool) return;
00815 #if LL_THREADS_APR
00816         // SJB: Windows defaults to NESTED and OSX defaults to UNNESTED, so use UNNESTED explicitly.
00817         apr_thread_mutex_create(&mChainsMutex, APR_THREAD_MUTEX_UNNESTED, pool);
00818         apr_thread_mutex_create(&mCallbackMutex, APR_THREAD_MUTEX_UNNESTED, pool);
00819 #endif
00820         mPool = pool;
00821 }
00822 
00823 void LLPumpIO::cleanup()
00824 {
00825         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00826 #if LL_THREADS_APR
00827         if(mChainsMutex) apr_thread_mutex_destroy(mChainsMutex);
00828         if(mCallbackMutex) apr_thread_mutex_destroy(mCallbackMutex);
00829 #endif
00830         mChainsMutex = NULL;
00831         mCallbackMutex = NULL;
00832         if(mPollset)
00833         {
00834 //              lldebugs << "cleaning up pollset" << llendl;
00835                 apr_pollset_destroy(mPollset);
00836                 mPollset = NULL;
00837         }
00838         if(mCurrentPool)
00839         {
00840                 apr_pool_destroy(mCurrentPool);
00841                 mCurrentPool = NULL;
00842         }
00843         mPool = NULL;
00844 }
00845 
00846 void LLPumpIO::rebuildPollset()
00847 {
00848         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00849 //      lldebugs << "LLPumpIO::rebuildPollset()" << llendl;
00850         if(mPollset)
00851         {
00852                 //lldebugs << "destroying pollset" << llendl;
00853                 apr_pollset_destroy(mPollset);
00854                 mPollset = NULL;
00855         }
00856         U32 size = 0;
00857         running_chains_t::iterator run_it = mRunningChains.begin();
00858         running_chains_t::iterator run_end = mRunningChains.end();
00859         for(; run_it != run_end; ++run_it)
00860         {
00861                 size += (*run_it).mDescriptors.size();
00862         }
00863         //lldebugs << "found " << size << " descriptors." << llendl;
00864         if(size)
00865         {
00866                 // Recycle the memory pool
00867                 const S32 POLLSET_POOL_RECYCLE_COUNT = 100;
00868                 if(mCurrentPool
00869                    && (0 == (++mCurrentPoolReallocCount % POLLSET_POOL_RECYCLE_COUNT)))
00870                 {
00871                         apr_pool_destroy(mCurrentPool);
00872                         mCurrentPool = NULL;
00873                         mCurrentPoolReallocCount = 0;
00874                 }
00875                 if(!mCurrentPool)
00876                 {
00877                         apr_status_t status = apr_pool_create(&mCurrentPool, mPool);
00878                         (void)ll_apr_warn_status(status);
00879                 }
00880 
00881                 // add all of the file descriptors
00882                 run_it = mRunningChains.begin();
00883                 LLChainInfo::conditionals_t::iterator fd_it;
00884                 LLChainInfo::conditionals_t::iterator fd_end;
00885                 apr_pollset_create(&mPollset, size, mCurrentPool, 0);
00886                 for(; run_it != run_end; ++run_it)
00887                 {
00888                         fd_it = (*run_it).mDescriptors.begin();
00889                         fd_end = (*run_it).mDescriptors.end();
00890                         for(; fd_it != fd_end; ++fd_it)
00891                         {
00892                                 apr_pollset_add(mPollset, &((*fd_it).second));
00893                         }
00894                 }
00895         }
00896 }
00897 
00898 void LLPumpIO::processChain(LLChainInfo& chain)
00899 {
00900         PUMP_DEBUG;
00901         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00902         LLIOPipe::EStatus status = LLIOPipe::STATUS_OK;
00903         links_t::iterator it = chain.mHead;
00904         links_t::iterator end = chain.mChainLinks.end();
00905         bool need_process_signaled = false;
00906         bool keep_going = true;
00907         do
00908         {
00909 #if LL_DEBUG_PROCESS_LINK
00910 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00911                 llinfos << "Processing " << typeid(*((*it).mPipe)).name() << "."
00912                                 << llendl;
00913 #else
00914                 llinfos << "Processing link " << (*it).mPipe << "." << llendl;
00915 #endif
00916 #endif
00917 #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN
00918                 if(chain.mData)
00919                 {
00920                         char* buf = NULL;
00921                         S32 bytes = chain.mData->countAfter((*it).mChannels.in(), NULL);
00922                         if(bytes)
00923                         {
00924                                 buf = new char[bytes + 1];
00925                                 chain.mData->readAfter(
00926                                         (*it).mChannels.in(),
00927                                         NULL,
00928                                         (U8*)buf,
00929                                         bytes);
00930                                 buf[bytes] = '\0';
00931                                 llinfos << "CHANNEL IN(" << (*it).mChannels.in() << "): "
00932                                                 << buf << llendl;
00933                                 delete[] buf;
00934                                 buf = NULL;
00935                         }
00936                         else
00937                         {
00938                                 llinfos << "CHANNEL IN(" << (*it).mChannels.in()<< "): (null)"
00939                                                 << llendl;
00940                         }
00941                 }
00942 #endif
00943                 PUMP_DEBUG;
00944                 status = (*it).mPipe->process(
00945                         (*it).mChannels,
00946                         chain.mData,
00947                         chain.mEOS,
00948                         chain.mContext,
00949                         this);
00950 #if LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT
00951                 if(chain.mData)
00952                 {
00953                         char* buf = NULL;
00954                         S32 bytes = chain.mData->countAfter((*it).mChannels.out(), NULL);
00955                         if(bytes)
00956                         {
00957                                 buf = new char[bytes + 1];
00958                                 chain.mData->readAfter(
00959                                         (*it).mChannels.out(),
00960                                         NULL,
00961                                         (U8*)buf,
00962                                         bytes);
00963                                 buf[bytes] = '\0';
00964                                 llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): "
00965                                                 << buf << llendl;
00966                                 delete[] buf;
00967                                 buf = NULL;
00968                         }
00969                         else
00970                         {
00971                                 llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): (null)"
00972                                                 << llendl;
00973                         }
00974                 }
00975 #endif
00976 
00977 #if LL_DEBUG_PROCESS_RETURN_VALUE
00978                 // Only bother with the success codes - error codes are logged
00979                 // below.
00980                 if(LLIOPipe::isSuccess(status))
00981                 {
00982                         llinfos << "Pipe returned: '"
00983 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00984                                         << typeid(*((*it).mPipe)).name() << "':'"
00985 #endif
00986                                         << LLIOPipe::lookupStatusString(status) << "'" << llendl;
00987                 }
00988 #endif
00989 
00990                 PUMP_DEBUG;
00991                 switch(status)
00992                 {
00993                 case LLIOPipe::STATUS_OK:
00994                         // no-op
00995                         break;
00996                 case LLIOPipe::STATUS_STOP:
00997                         PUMP_DEBUG;
00998                         status = LLIOPipe::STATUS_OK;
00999                         chain.mHead = end;
01000                         keep_going = false;
01001                         break;
01002                 case LLIOPipe::STATUS_DONE:
01003                         PUMP_DEBUG;
01004                         status = LLIOPipe::STATUS_OK;
01005                         chain.mHead = (it + 1);
01006                         chain.mEOS = true;
01007                         break;
01008                 case LLIOPipe::STATUS_BREAK:
01009                         PUMP_DEBUG;
01010                         status = LLIOPipe::STATUS_OK;
01011                         keep_going = false;
01012                         break;
01013                 case LLIOPipe::STATUS_NEED_PROCESS:
01014                         PUMP_DEBUG;
01015                         status = LLIOPipe::STATUS_OK;
01016                         if(!need_process_signaled)
01017                         {
01018                                 need_process_signaled = true;
01019                                 chain.mHead = it;
01020                         }
01021                         break;
01022                 default:
01023                         PUMP_DEBUG;
01024                         if(LLIOPipe::isError(status))
01025                         {
01026                                 llinfos << "Pump generated pipe err: '"
01027 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
01028                                                 << typeid(*((*it).mPipe)).name() << "':'"
01029 #endif
01030                                                 << LLIOPipe::lookupStatusString(status)
01031                                                 << "'" << llendl;
01032 #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR
01033                                 if(chain.mData)
01034                                 {
01035                                         char* buf = NULL;
01036                                         S32 bytes = chain.mData->countAfter(
01037                                                 (*it).mChannels.in(),
01038                                                 NULL);
01039                                         if(bytes)
01040                                         {
01041                                                 buf = new char[bytes + 1];
01042                                                 chain.mData->readAfter(
01043                                                         (*it).mChannels.in(),
01044                                                         NULL,
01045                                                         (U8*)buf,
01046                                                         bytes);
01047                                                 buf[bytes] = '\0';
01048                                                 llinfos << "Input After Error: " << buf << llendl;
01049                                                 delete[] buf;
01050                                                 buf = NULL;
01051                                         }
01052                                         else
01053                                         {
01054                                                 llinfos << "Input After Error: (null)" << llendl;
01055                                         }
01056                                 }
01057                                 else
01058                                 {
01059                                         llinfos << "Input After Error: (null)" << llendl;
01060                                 }
01061 #endif
01062                                 keep_going = false;
01063                                 chain.mHead  = it;
01064                                 if(!handleChainError(chain, status))
01065                                 {
01066                                         chain.mHead = end;
01067                                 }
01068                         }
01069                         else
01070                         {
01071                                 llinfos << "Unhandled status code: " << status << ":"
01072                                                 << LLIOPipe::lookupStatusString(status) << llendl;
01073                         }
01074                         break;
01075                 }
01076                 PUMP_DEBUG;
01077         } while(keep_going && (++it != end));
01078         PUMP_DEBUG;
01079 }
01080 
01081 bool LLPumpIO::handleChainError(
01082         LLChainInfo& chain,
01083         LLIOPipe::EStatus error)
01084 {
01085         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
01086         links_t::reverse_iterator rit;
01087         if(chain.mHead == chain.mChainLinks.end())
01088         {
01089                 rit = links_t::reverse_iterator(chain.mHead);
01090         }
01091         else
01092         {
01093                 rit = links_t::reverse_iterator(chain.mHead + 1);
01094         }
01095         
01096         links_t::reverse_iterator rend = chain.mChainLinks.rend();
01097         bool handled = false;
01098         bool keep_going = true;
01099         do
01100         {
01101 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
01102                 lldebugs << "Passing error to " << typeid(*((*rit).mPipe)).name()
01103                                  << "." << llendl;
01104 #endif
01105                 error = (*rit).mPipe->handleError(error, this);
01106                 switch(error)
01107                 {
01108                 case LLIOPipe::STATUS_OK:
01109                         handled = true;
01110                         chain.mHead = rit.base();
01111                         break;
01112                 case LLIOPipe::STATUS_STOP:
01113                 case LLIOPipe::STATUS_DONE:
01114                 case LLIOPipe::STATUS_BREAK:
01115                 case LLIOPipe::STATUS_NEED_PROCESS:
01116 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
01117                         lldebugs << "Pipe " << typeid(*((*rit).mPipe)).name()
01118                                          << " returned code to stop error handler." << llendl;
01119 #endif
01120                         keep_going = false;
01121                         break;
01122                 default:
01123                         if(LLIOPipe::isSuccess(error))
01124                         {
01125                                 llinfos << "Unhandled status code: " << error << ":"
01126                                                 << LLIOPipe::lookupStatusString(error) << llendl;
01127                                 error = LLIOPipe::STATUS_ERROR;
01128                                 keep_going = false;
01129                         }
01130                         break;
01131                 }
01132         } while(keep_going && !handled && (++rit != rend));
01133         return handled;
01134 }
01135 
01140 LLPumpIO::LLChainInfo::LLChainInfo() :
01141         mInit(false),
01142         mLock(0),
01143         mEOS(false)
01144 {
01145         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
01146         mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS);
01147 }
01148 
01149 void LLPumpIO::LLChainInfo::setTimeoutSeconds(F32 timeout)
01150 {
01151         LLMemType m1(LLMemType::MTYPE_IO_PUMP);
01152         if(timeout > 0.0f)
01153         {
01154                 mTimer.start();
01155                 mTimer.reset();
01156                 mTimer.setTimerExpirySec(timeout);
01157         }
01158         else
01159         {
01160                 mTimer.stop();
01161         }
01162 }

Generated on Fri May 16 08:32:29 2008 for SecondLife by  doxygen 1.5.5