00001
00034 #include "linden_common.h"
00035 #include "llpumpio.h"
00036
00037 #include <map>
00038 #include <set>
00039 #include "apr-1/apr_poll.h"
00040
00041 #include "llapr.h"
00042 #include "llmemtype.h"
00043 #include "llstl.h"
00044
00045
00046
00047
00048 #if LL_LINUX
00049
00050
00051 #if LL_DEBUG_POLL_FILE_DESCRIPTORS
00052 #include "apr-1/apr_portable.h"
00053 #endif
00054 #endif
00055
00056 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00057 #include <typeinfo>
00058 #endif
00059
00060
00061
00062 #if LL_THREADS_APR
00063 static const S32 DEFAULT_POLL_TIMEOUT = 1000;
00064 #else
00065 static const S32 DEFAULT_POLL_TIMEOUT = 0;
00066 #endif
00067
00068
00069 const F32 DEFAULT_CHAIN_EXPIRY_SECS = 30.0f;
00070 extern const F32 SHORT_CHAIN_EXPIRY_SECS = 1.0f;
00071 extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f;
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085 void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll)
00086 {
00087 #if LL_DEBUG_POLL_FILE_DESCRIPTORS
00088 if(!poll)
00089 {
00090 lldebugs << "Poll -- " << (msg?msg:"") << ": no pollfd." << llendl;
00091 return;
00092 }
00093 if(poll->desc.s)
00094 {
00095 apr_os_sock_t os_sock;
00096 if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s))
00097 {
00098 lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_sock
00099 << " at " << poll->desc.s << llendl;
00100 }
00101 else
00102 {
00103 lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
00104 << " at " << poll->desc.s << llendl;
00105 }
00106 }
00107 else if(poll->desc.f)
00108 {
00109 apr_os_file_t os_file;
00110 if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f))
00111 {
00112 lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_file
00113 << " at " << poll->desc.f << llendl;
00114 }
00115 else
00116 {
00117 lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
00118 << " at " << poll->desc.f << llendl;
00119 }
00120 }
00121 else
00122 {
00123 lldebugs << "Poll -- " << (msg?msg:"") << ": no descriptor." << llendl;
00124 }
00125 #endif
00126 }
00127
00131 class LLChainSleeper : public LLRunnable
00132 {
00133 public:
00134 static LLRunner::run_ptr_t build(LLPumpIO* pump, S32 key)
00135 {
00136 return LLRunner::run_ptr_t(new LLChainSleeper(pump, key));
00137 }
00138
00139 virtual void run(LLRunner* runner, S64 handle)
00140 {
00141 mPump->clearLock(mKey);
00142 }
00143
00144 protected:
00145 LLChainSleeper(LLPumpIO* pump, S32 key) : mPump(pump), mKey(key) {}
00146 LLPumpIO* mPump;
00147 S32 mKey;
00148 };
00149
00150
00155 struct ll_delete_apr_pollset_fd_client_data
00156 {
00157 typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t;
00158 void operator()(const pipe_conditional_t& conditional)
00159 {
00160 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00161 S32* client_id = (S32*)conditional.second.client_data;
00162 delete client_id;
00163 }
00164 };
00165
00169 LLPumpIO::LLPumpIO(apr_pool_t* pool) :
00170 mState(LLPumpIO::NORMAL),
00171 mRebuildPollset(false),
00172 mPollset(NULL),
00173 mPollsetClientID(0),
00174 mNextLock(0),
00175 mPool(NULL),
00176 mCurrentPool(NULL),
00177 mCurrentPoolReallocCount(0),
00178 mChainsMutex(NULL),
00179 mCallbackMutex(NULL)
00180 {
00181 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00182 initialize(pool);
00183 }
00184
00185 LLPumpIO::~LLPumpIO()
00186 {
00187 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00188 cleanup();
00189 }
00190
00191 bool LLPumpIO::prime(apr_pool_t* pool)
00192 {
00193 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00194 cleanup();
00195 initialize(pool);
00196 return ((pool == NULL) ? false : true);
00197 }
00198
00199 bool LLPumpIO::addChain(const chain_t& chain, F32 timeout)
00200 {
00201 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00202 if(chain.empty()) return false;
00203
00204 #if LL_THREADS_APR
00205 LLScopedLock lock(mChainsMutex);
00206 #endif
00207 LLChainInfo info;
00208 info.setTimeoutSeconds(timeout);
00209 info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray);
00210 LLLinkInfo link;
00211 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00212 lldebugs << "LLPumpIO::addChain() " << chain[0] << " '"
00213 << typeid(*(chain[0])).name() << "'" << llendl;
00214 #else
00215 lldebugs << "LLPumpIO::addChain() " << chain[0] <<llendl;
00216 #endif
00217 chain_t::const_iterator it = chain.begin();
00218 chain_t::const_iterator end = chain.end();
00219 for(; it != end; ++it)
00220 {
00221 link.mPipe = (*it);
00222 link.mChannels = info.mData->nextChannel();
00223 info.mChainLinks.push_back(link);
00224 }
00225 mPendingChains.push_back(info);
00226 return true;
00227 }
00228
00229 bool LLPumpIO::addChain(
00230 const LLPumpIO::links_t& links,
00231 LLIOPipe::buffer_ptr_t data,
00232 LLSD context,
00233 F32 timeout)
00234 {
00235 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00236
00237
00238
00239
00240 if(!data) return false;
00241 if(links.empty()) return false;
00242
00243 #if LL_THREADS_APR
00244 LLScopedLock lock(mChainsMutex);
00245 #endif
00246 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00247 lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << " '"
00248 << typeid(*(links[0].mPipe)).name() << "'" << llendl;
00249 #else
00250 lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << llendl;
00251 #endif
00252 LLChainInfo info;
00253 info.setTimeoutSeconds(timeout);
00254 info.mChainLinks = links;
00255 info.mData = data;
00256 info.mContext = context;
00257 mPendingChains.push_back(info);
00258 return true;
00259 }
00260
00261 bool LLPumpIO::setTimeoutSeconds(F32 timeout)
00262 {
00263
00264 if(current_chain_t() == mCurrentChain)
00265 {
00266 return false;
00267 }
00268 (*mCurrentChain).setTimeoutSeconds(timeout);
00269 return true;
00270 }
00271
00272 static std::string events_2_string(apr_int16_t events)
00273 {
00274 std::ostringstream ostr;
00275 if(events & APR_POLLIN)
00276 {
00277 ostr << "read,";
00278 }
00279 if(events & APR_POLLPRI)
00280 {
00281 ostr << "priority,";
00282 }
00283 if(events & APR_POLLOUT)
00284 {
00285 ostr << "write,";
00286 }
00287 if(events & APR_POLLERR)
00288 {
00289 ostr << "error,";
00290 }
00291 if(events & APR_POLLHUP)
00292 {
00293 ostr << "hangup,";
00294 }
00295 if(events & APR_POLLNVAL)
00296 {
00297 ostr << "invalid,";
00298 }
00299 return chop_tail_copy(ostr.str(), 1);
00300 }
00301
00302 bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll)
00303 {
00304 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00305 if(!pipe) return false;
00306 ll_debug_poll_fd("Set conditional", poll);
00307
00308 lldebugs << "Setting conditionals (" << (poll ? events_2_string(poll->reqevents) :"null")
00309 << ") "
00310 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00311 << "on pipe " << typeid(*pipe).name()
00312 #endif
00313 << " at " << pipe << llendl;
00314
00315
00316 LLIOPipe::ptr_t pipe_ptr(pipe);
00317 LLChainInfo::conditionals_t::iterator it;
00318 it = (*mCurrentChain).mDescriptors.begin();
00319 while(it != (*mCurrentChain).mDescriptors.end())
00320 {
00321 LLChainInfo::pipe_conditional_t& value = (*it);
00322 if(pipe_ptr == value.first)
00323 {
00324 ll_delete_apr_pollset_fd_client_data()(value);
00325 it = (*mCurrentChain).mDescriptors.erase(it);
00326 mRebuildPollset = true;
00327 }
00328 else
00329 {
00330 ++it;
00331 }
00332 }
00333
00334 if(!poll)
00335 {
00336 mRebuildPollset = true;
00337 return true;
00338 }
00339 LLChainInfo::pipe_conditional_t value;
00340 value.first = pipe_ptr;
00341 value.second = *poll;
00342 value.second.rtnevents = 0;
00343 if(!poll->p)
00344 {
00345
00346
00347
00348 value.second.p = mPool;
00349 }
00350 value.second.client_data = new S32(++mPollsetClientID);
00351 (*mCurrentChain).mDescriptors.push_back(value);
00352 mRebuildPollset = true;
00353 return true;
00354 }
00355
00356 S32 LLPumpIO::setLock()
00357 {
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368 if(current_chain_t() == mCurrentChain)
00369 {
00370 return 0;
00371 }
00372
00373
00374 if(++mNextLock <= 0)
00375 {
00376 mNextLock = 1;
00377 }
00378
00379
00380 (*mCurrentChain).mLock = mNextLock;
00381 return mNextLock;
00382 }
00383
00384 void LLPumpIO::clearLock(S32 key)
00385 {
00386
00387
00388
00389
00390
00391
00392 #if LL_THREADS_APR
00393 LLScopedLock lock(mChainsMutex);
00394 #endif
00395 mClearLocks.insert(key);
00396 }
00397
00398 bool LLPumpIO::sleepChain(F64 seconds)
00399 {
00400
00401
00402
00403 if(seconds <= 0.0) return false;
00404 S32 key = setLock();
00405 if(!key) return false;
00406 LLRunner::run_handle_t handle = mRunner.addRunnable(
00407 LLChainSleeper::build(this, key),
00408 LLRunner::RUN_IN,
00409 seconds);
00410 if(0 == handle) return false;
00411 return true;
00412 }
00413
00414 bool LLPumpIO::copyCurrentLinkInfo(links_t& links) const
00415 {
00416 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00417 if(current_chain_t() == mCurrentChain)
00418 {
00419 return false;
00420 }
00421 std::copy(
00422 (*mCurrentChain).mChainLinks.begin(),
00423 (*mCurrentChain).mChainLinks.end(),
00424 std::back_insert_iterator<links_t>(links));
00425 return true;
00426 }
00427
00428 void LLPumpIO::pump()
00429 {
00430 pump(DEFAULT_POLL_TIMEOUT);
00431 }
00432
00433
00434 void LLPumpIO::pump(const S32& poll_timeout)
00435 {
00436 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00437 LLFastTimer t1(LLFastTimer::FTM_PUMP);
00438
00439
00440
00441 mRunner.run();
00442
00443
00444
00445 PUMP_DEBUG;
00446 if(true)
00447 {
00448 #if LL_THREADS_APR
00449 LLScopedLock lock(mChainsMutex);
00450 #endif
00451
00452 if(PAUSING == mState)
00453 {
00454 mState = PAUSED;
00455 }
00456 if(PAUSED == mState)
00457 {
00458 return;
00459 }
00460
00461 PUMP_DEBUG;
00462
00463 if(!mPendingChains.empty())
00464 {
00465 PUMP_DEBUG;
00466
00467 std::copy(
00468 mPendingChains.begin(),
00469 mPendingChains.end(),
00470 std::back_insert_iterator<running_chains_t>(mRunningChains));
00471 mPendingChains.clear();
00472 PUMP_DEBUG;
00473 }
00474
00475
00476
00477 if(!mClearLocks.empty())
00478 {
00479 PUMP_DEBUG;
00480 running_chains_t::iterator it = mRunningChains.begin();
00481 running_chains_t::iterator end = mRunningChains.end();
00482 std::set<S32>::iterator not_cleared = mClearLocks.end();
00483 for(; it != end; ++it)
00484 {
00485 if((*it).mLock && mClearLocks.find((*it).mLock) != not_cleared)
00486 {
00487 (*it).mLock = 0;
00488 }
00489 }
00490 PUMP_DEBUG;
00491 mClearLocks.clear();
00492 }
00493 }
00494
00495 PUMP_DEBUG;
00496
00497 if(mRebuildPollset)
00498 {
00499 PUMP_DEBUG;
00500 rebuildPollset();
00501 mRebuildPollset = false;
00502 }
00503
00504
00505
00506
00507 PUMP_DEBUG;
00508 typedef std::map<S32, S32> signal_client_t;
00509 signal_client_t signalled_client;
00510 const apr_pollfd_t* poll_fd = NULL;
00511 if(mPollset)
00512 {
00513 PUMP_DEBUG;
00514
00515 S32 count = 0;
00516 S32 client_id = 0;
00517 apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd);
00518 PUMP_DEBUG;
00519 for(S32 ii = 0; ii < count; ++ii)
00520 {
00521 ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]);
00522 client_id = *((S32*)poll_fd[ii].client_data);
00523 signalled_client[client_id] = ii;
00524 }
00525 PUMP_DEBUG;
00526 }
00527
00528 PUMP_DEBUG;
00529
00530 signal_client_t::iterator not_signalled = signalled_client.end();
00531
00532
00533
00534 running_chains_t::iterator run_chain = mRunningChains.begin();
00535 bool process_this_chain = false;
00536 for(; run_chain != mRunningChains.end(); )
00537 {
00538 PUMP_DEBUG;
00539 if((*run_chain).mInit
00540 && (*run_chain).mTimer.getStarted()
00541 && (*run_chain).mTimer.hasExpired())
00542 {
00543 PUMP_DEBUG;
00544 if(handleChainError(*run_chain, LLIOPipe::STATUS_EXPIRED))
00545 {
00546
00547
00548
00549 if((*run_chain).mTimer.getStarted()
00550 && (*run_chain).mTimer.hasExpired())
00551 {
00552 PUMP_DEBUG;
00553 llinfos << "Error handler forgot to reset timeout. "
00554 << "Resetting to " << DEFAULT_CHAIN_EXPIRY_SECS
00555 << " seconds." << llendl;
00556 (*run_chain).setTimeoutSeconds(DEFAULT_CHAIN_EXPIRY_SECS);
00557 }
00558 }
00559 else
00560 {
00561 PUMP_DEBUG;
00562
00563
00564 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00565 lldebugs << "Removing chain "
00566 << (*run_chain).mChainLinks[0].mPipe
00567 << " '"
00568 << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
00569 << "' because it timed out." << llendl;
00570 #else
00571
00572
00573
00574 #endif
00575 run_chain = mRunningChains.erase(run_chain);
00576 continue;
00577 }
00578 }
00579 PUMP_DEBUG;
00580 if((*run_chain).mLock)
00581 {
00582 ++run_chain;
00583 continue;
00584 }
00585 PUMP_DEBUG;
00586 mCurrentChain = run_chain;
00587 if((*run_chain).mDescriptors.empty())
00588 {
00589
00590 process_this_chain = true;
00591
00592 }
00593 else
00594 {
00595 PUMP_DEBUG;
00596
00597
00598
00599
00600 process_this_chain = false;
00601 if(!signalled_client.empty())
00602 {
00603 PUMP_DEBUG;
00604 LLChainInfo::conditionals_t::iterator it;
00605 it = (*run_chain).mDescriptors.begin();
00606 LLChainInfo::conditionals_t::iterator end;
00607 end = (*run_chain).mDescriptors.end();
00608 S32 client_id = 0;
00609 signal_client_t::iterator signal;
00610 for(; it != end; ++it)
00611 {
00612 PUMP_DEBUG;
00613 client_id = *((S32*)((*it).second.client_data));
00614 signal = signalled_client.find(client_id);
00615 if (signal == not_signalled) continue;
00616 static const apr_int16_t POLL_CHAIN_ERROR =
00617 APR_POLLHUP | APR_POLLNVAL | APR_POLLERR;
00618 const apr_pollfd_t* poll = &(poll_fd[(*signal).second]);
00619 if(poll->rtnevents & POLL_CHAIN_ERROR)
00620 {
00621
00622
00623
00624
00625
00626
00627
00628
00629 LLIOPipe::EStatus error_status;
00630 if(poll->rtnevents & APR_POLLHUP)
00631 error_status = LLIOPipe::STATUS_LOST_CONNECTION;
00632 else
00633 error_status = LLIOPipe::STATUS_ERROR;
00634 if(handleChainError(*run_chain, error_status)) break;
00635 ll_debug_poll_fd("Removing pipe", poll);
00636 llwarns << "Removing pipe "
00637 << (*run_chain).mChainLinks[0].mPipe
00638 << " '"
00639 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00640 << typeid(
00641 *((*run_chain).mChainLinks[0].mPipe)).name()
00642 #endif
00643 << "' because: "
00644 << events_2_string(poll->rtnevents)
00645 << llendl;
00646 (*run_chain).mHead = (*run_chain).mChainLinks.end();
00647 break;
00648 }
00649
00650
00651
00652 process_this_chain = true;
00653 break;
00654 }
00655 }
00656 }
00657 if(process_this_chain)
00658 {
00659 PUMP_DEBUG;
00660 if(!((*run_chain).mInit))
00661 {
00662 (*run_chain).mHead = (*run_chain).mChainLinks.begin();
00663 (*run_chain).mInit = true;
00664 }
00665 PUMP_DEBUG;
00666 processChain(*run_chain);
00667 }
00668
00669 PUMP_DEBUG;
00670 if((*run_chain).mHead == (*run_chain).mChainLinks.end())
00671 {
00672 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00673 lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
00674 << " '"
00675 << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
00676 << "' because we reached the end." << llendl;
00677 #else
00678
00679
00680 #endif
00681
00682 PUMP_DEBUG;
00683
00684
00685 std::for_each(
00686 (*run_chain).mDescriptors.begin(),
00687 (*run_chain).mDescriptors.end(),
00688 ll_delete_apr_pollset_fd_client_data());
00689 run_chain = mRunningChains.erase(run_chain);
00690
00691
00692 mRebuildPollset = true;
00693 }
00694 else
00695 {
00696 PUMP_DEBUG;
00697
00698
00699 ++run_chain;
00700 }
00701 }
00702
00703 PUMP_DEBUG;
00704
00705 mCurrentChain = current_chain_t();
00706 END_PUMP_DEBUG;
00707 }
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719
00720
00721 bool LLPumpIO::respond(LLIOPipe* pipe)
00722 {
00723 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00724 if(NULL == pipe) return false;
00725
00726 #if LL_THREADS_APR
00727 LLScopedLock lock(mCallbackMutex);
00728 #endif
00729 LLChainInfo info;
00730 LLLinkInfo link;
00731 link.mPipe = pipe;
00732 info.mChainLinks.push_back(link);
00733 mPendingCallbacks.push_back(info);
00734 return true;
00735 }
00736
00737 bool LLPumpIO::respond(
00738 const links_t& links,
00739 LLIOPipe::buffer_ptr_t data,
00740 LLSD context)
00741 {
00742 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00743
00744
00745 if(!data) return false;
00746 if(links.empty()) return false;
00747
00748 #if LL_THREADS_APR
00749 LLScopedLock lock(mCallbackMutex);
00750 #endif
00751
00752
00753 LLChainInfo info;
00754 info.mChainLinks = links;
00755 info.mData = data;
00756 info.mContext = context;
00757 mPendingCallbacks.push_back(info);
00758 return true;
00759 }
00760
00761 void LLPumpIO::callback()
00762 {
00763 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00764
00765 if(true)
00766 {
00767 #if LL_THREADS_APR
00768 LLScopedLock lock(mCallbackMutex);
00769 #endif
00770 std::copy(
00771 mPendingCallbacks.begin(),
00772 mPendingCallbacks.end(),
00773 std::back_insert_iterator<callbacks_t>(mCallbacks));
00774 mPendingCallbacks.clear();
00775 }
00776 if(!mCallbacks.empty())
00777 {
00778 callbacks_t::iterator it = mCallbacks.begin();
00779 callbacks_t::iterator end = mCallbacks.end();
00780 for(; it != end; ++it)
00781 {
00782
00783 (*it).mHead = (*it).mChainLinks.begin();
00784 (*it).mInit = true;
00785 (*it).mEOS = true;
00786 processChain(*it);
00787 }
00788 mCallbacks.clear();
00789 }
00790 }
00791
00792 void LLPumpIO::control(LLPumpIO::EControl op)
00793 {
00794 #if LL_THREADS_APR
00795 LLScopedLock lock(mChainsMutex);
00796 #endif
00797 switch(op)
00798 {
00799 case PAUSE:
00800 mState = PAUSING;
00801 break;
00802 case RESUME:
00803 mState = NORMAL;
00804 break;
00805 default:
00806
00807 break;
00808 }
00809 }
00810
00811 void LLPumpIO::initialize(apr_pool_t* pool)
00812 {
00813 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00814 if(!pool) return;
00815 #if LL_THREADS_APR
00816
00817 apr_thread_mutex_create(&mChainsMutex, APR_THREAD_MUTEX_UNNESTED, pool);
00818 apr_thread_mutex_create(&mCallbackMutex, APR_THREAD_MUTEX_UNNESTED, pool);
00819 #endif
00820 mPool = pool;
00821 }
00822
00823 void LLPumpIO::cleanup()
00824 {
00825 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00826 #if LL_THREADS_APR
00827 if(mChainsMutex) apr_thread_mutex_destroy(mChainsMutex);
00828 if(mCallbackMutex) apr_thread_mutex_destroy(mCallbackMutex);
00829 #endif
00830 mChainsMutex = NULL;
00831 mCallbackMutex = NULL;
00832 if(mPollset)
00833 {
00834
00835 apr_pollset_destroy(mPollset);
00836 mPollset = NULL;
00837 }
00838 if(mCurrentPool)
00839 {
00840 apr_pool_destroy(mCurrentPool);
00841 mCurrentPool = NULL;
00842 }
00843 mPool = NULL;
00844 }
00845
00846 void LLPumpIO::rebuildPollset()
00847 {
00848 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00849
00850 if(mPollset)
00851 {
00852
00853 apr_pollset_destroy(mPollset);
00854 mPollset = NULL;
00855 }
00856 U32 size = 0;
00857 running_chains_t::iterator run_it = mRunningChains.begin();
00858 running_chains_t::iterator run_end = mRunningChains.end();
00859 for(; run_it != run_end; ++run_it)
00860 {
00861 size += (*run_it).mDescriptors.size();
00862 }
00863
00864 if(size)
00865 {
00866
00867 const S32 POLLSET_POOL_RECYCLE_COUNT = 100;
00868 if(mCurrentPool
00869 && (0 == (++mCurrentPoolReallocCount % POLLSET_POOL_RECYCLE_COUNT)))
00870 {
00871 apr_pool_destroy(mCurrentPool);
00872 mCurrentPool = NULL;
00873 mCurrentPoolReallocCount = 0;
00874 }
00875 if(!mCurrentPool)
00876 {
00877 apr_status_t status = apr_pool_create(&mCurrentPool, mPool);
00878 (void)ll_apr_warn_status(status);
00879 }
00880
00881
00882 run_it = mRunningChains.begin();
00883 LLChainInfo::conditionals_t::iterator fd_it;
00884 LLChainInfo::conditionals_t::iterator fd_end;
00885 apr_pollset_create(&mPollset, size, mCurrentPool, 0);
00886 for(; run_it != run_end; ++run_it)
00887 {
00888 fd_it = (*run_it).mDescriptors.begin();
00889 fd_end = (*run_it).mDescriptors.end();
00890 for(; fd_it != fd_end; ++fd_it)
00891 {
00892 apr_pollset_add(mPollset, &((*fd_it).second));
00893 }
00894 }
00895 }
00896 }
00897
00898 void LLPumpIO::processChain(LLChainInfo& chain)
00899 {
00900 PUMP_DEBUG;
00901 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00902 LLIOPipe::EStatus status = LLIOPipe::STATUS_OK;
00903 links_t::iterator it = chain.mHead;
00904 links_t::iterator end = chain.mChainLinks.end();
00905 bool need_process_signaled = false;
00906 bool keep_going = true;
00907 do
00908 {
00909 #if LL_DEBUG_PROCESS_LINK
00910 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00911 llinfos << "Processing " << typeid(*((*it).mPipe)).name() << "."
00912 << llendl;
00913 #else
00914 llinfos << "Processing link " << (*it).mPipe << "." << llendl;
00915 #endif
00916 #endif
00917 #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN
00918 if(chain.mData)
00919 {
00920 char* buf = NULL;
00921 S32 bytes = chain.mData->countAfter((*it).mChannels.in(), NULL);
00922 if(bytes)
00923 {
00924 buf = new char[bytes + 1];
00925 chain.mData->readAfter(
00926 (*it).mChannels.in(),
00927 NULL,
00928 (U8*)buf,
00929 bytes);
00930 buf[bytes] = '\0';
00931 llinfos << "CHANNEL IN(" << (*it).mChannels.in() << "): "
00932 << buf << llendl;
00933 delete[] buf;
00934 buf = NULL;
00935 }
00936 else
00937 {
00938 llinfos << "CHANNEL IN(" << (*it).mChannels.in()<< "): (null)"
00939 << llendl;
00940 }
00941 }
00942 #endif
00943 PUMP_DEBUG;
00944 status = (*it).mPipe->process(
00945 (*it).mChannels,
00946 chain.mData,
00947 chain.mEOS,
00948 chain.mContext,
00949 this);
00950 #if LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT
00951 if(chain.mData)
00952 {
00953 char* buf = NULL;
00954 S32 bytes = chain.mData->countAfter((*it).mChannels.out(), NULL);
00955 if(bytes)
00956 {
00957 buf = new char[bytes + 1];
00958 chain.mData->readAfter(
00959 (*it).mChannels.out(),
00960 NULL,
00961 (U8*)buf,
00962 bytes);
00963 buf[bytes] = '\0';
00964 llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): "
00965 << buf << llendl;
00966 delete[] buf;
00967 buf = NULL;
00968 }
00969 else
00970 {
00971 llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): (null)"
00972 << llendl;
00973 }
00974 }
00975 #endif
00976
00977 #if LL_DEBUG_PROCESS_RETURN_VALUE
00978
00979
00980 if(LLIOPipe::isSuccess(status))
00981 {
00982 llinfos << "Pipe returned: '"
00983 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00984 << typeid(*((*it).mPipe)).name() << "':'"
00985 #endif
00986 << LLIOPipe::lookupStatusString(status) << "'" << llendl;
00987 }
00988 #endif
00989
00990 PUMP_DEBUG;
00991 switch(status)
00992 {
00993 case LLIOPipe::STATUS_OK:
00994
00995 break;
00996 case LLIOPipe::STATUS_STOP:
00997 PUMP_DEBUG;
00998 status = LLIOPipe::STATUS_OK;
00999 chain.mHead = end;
01000 keep_going = false;
01001 break;
01002 case LLIOPipe::STATUS_DONE:
01003 PUMP_DEBUG;
01004 status = LLIOPipe::STATUS_OK;
01005 chain.mHead = (it + 1);
01006 chain.mEOS = true;
01007 break;
01008 case LLIOPipe::STATUS_BREAK:
01009 PUMP_DEBUG;
01010 status = LLIOPipe::STATUS_OK;
01011 keep_going = false;
01012 break;
01013 case LLIOPipe::STATUS_NEED_PROCESS:
01014 PUMP_DEBUG;
01015 status = LLIOPipe::STATUS_OK;
01016 if(!need_process_signaled)
01017 {
01018 need_process_signaled = true;
01019 chain.mHead = it;
01020 }
01021 break;
01022 default:
01023 PUMP_DEBUG;
01024 if(LLIOPipe::isError(status))
01025 {
01026 llinfos << "Pump generated pipe err: '"
01027 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
01028 << typeid(*((*it).mPipe)).name() << "':'"
01029 #endif
01030 << LLIOPipe::lookupStatusString(status)
01031 << "'" << llendl;
01032 #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR
01033 if(chain.mData)
01034 {
01035 char* buf = NULL;
01036 S32 bytes = chain.mData->countAfter(
01037 (*it).mChannels.in(),
01038 NULL);
01039 if(bytes)
01040 {
01041 buf = new char[bytes + 1];
01042 chain.mData->readAfter(
01043 (*it).mChannels.in(),
01044 NULL,
01045 (U8*)buf,
01046 bytes);
01047 buf[bytes] = '\0';
01048 llinfos << "Input After Error: " << buf << llendl;
01049 delete[] buf;
01050 buf = NULL;
01051 }
01052 else
01053 {
01054 llinfos << "Input After Error: (null)" << llendl;
01055 }
01056 }
01057 else
01058 {
01059 llinfos << "Input After Error: (null)" << llendl;
01060 }
01061 #endif
01062 keep_going = false;
01063 chain.mHead = it;
01064 if(!handleChainError(chain, status))
01065 {
01066 chain.mHead = end;
01067 }
01068 }
01069 else
01070 {
01071 llinfos << "Unhandled status code: " << status << ":"
01072 << LLIOPipe::lookupStatusString(status) << llendl;
01073 }
01074 break;
01075 }
01076 PUMP_DEBUG;
01077 } while(keep_going && (++it != end));
01078 PUMP_DEBUG;
01079 }
01080
01081 bool LLPumpIO::handleChainError(
01082 LLChainInfo& chain,
01083 LLIOPipe::EStatus error)
01084 {
01085 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
01086 links_t::reverse_iterator rit;
01087 if(chain.mHead == chain.mChainLinks.end())
01088 {
01089 rit = links_t::reverse_iterator(chain.mHead);
01090 }
01091 else
01092 {
01093 rit = links_t::reverse_iterator(chain.mHead + 1);
01094 }
01095
01096 links_t::reverse_iterator rend = chain.mChainLinks.rend();
01097 bool handled = false;
01098 bool keep_going = true;
01099 do
01100 {
01101 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
01102 lldebugs << "Passing error to " << typeid(*((*rit).mPipe)).name()
01103 << "." << llendl;
01104 #endif
01105 error = (*rit).mPipe->handleError(error, this);
01106 switch(error)
01107 {
01108 case LLIOPipe::STATUS_OK:
01109 handled = true;
01110 chain.mHead = rit.base();
01111 break;
01112 case LLIOPipe::STATUS_STOP:
01113 case LLIOPipe::STATUS_DONE:
01114 case LLIOPipe::STATUS_BREAK:
01115 case LLIOPipe::STATUS_NEED_PROCESS:
01116 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
01117 lldebugs << "Pipe " << typeid(*((*rit).mPipe)).name()
01118 << " returned code to stop error handler." << llendl;
01119 #endif
01120 keep_going = false;
01121 break;
01122 default:
01123 if(LLIOPipe::isSuccess(error))
01124 {
01125 llinfos << "Unhandled status code: " << error << ":"
01126 << LLIOPipe::lookupStatusString(error) << llendl;
01127 error = LLIOPipe::STATUS_ERROR;
01128 keep_going = false;
01129 }
01130 break;
01131 }
01132 } while(keep_going && !handled && (++rit != rend));
01133 return handled;
01134 }
01135
01140 LLPumpIO::LLChainInfo::LLChainInfo() :
01141 mInit(false),
01142 mLock(0),
01143 mEOS(false)
01144 {
01145 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
01146 mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS);
01147 }
01148
01149 void LLPumpIO::LLChainInfo::setTimeoutSeconds(F32 timeout)
01150 {
01151 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
01152 if(timeout > 0.0f)
01153 {
01154 mTimer.start();
01155 mTimer.reset();
01156 mTimer.setTimerExpirySec(timeout);
01157 }
01158 else
01159 {
01160 mTimer.stop();
01161 }
01162 }