00001
00034 #include "linden_common.h"
00035 #include "llpumpio.h"
00036
00037 #include <set>
00038 #include "apr-1/apr_poll.h"
00039
00040 #include "llapr.h"
00041 #include "llmemtype.h"
00042 #include "llstl.h"
00043
00044
00045
00046 #if LL_LINUX
00047 #define LL_DEBUG_PIPE_TYPE_IN_PUMP 0
00048 #endif
00049
00050 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00051 #include <typeinfo>
00052 #endif
00053
00054
00055
00056 #if LL_THREADS_APR
00057 static const S32 DEFAULT_POLL_TIMEOUT = 1000;
00058 #else
00059 static const S32 DEFAULT_POLL_TIMEOUT = 0;
00060 #endif
00061
00062
00063 const F32 DEFAULT_CHAIN_EXPIRY_SECS = 30.0f;
00064 extern const F32 SHORT_CHAIN_EXPIRY_SECS = 1.0f;
00065 extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f;
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00079 class LLChainSleeper : public LLRunnable
00080 {
00081 public:
00082 static LLRunner::run_ptr_t build(LLPumpIO* pump, S32 key)
00083 {
00084 return LLRunner::run_ptr_t(new LLChainSleeper(pump, key));
00085 }
00086
00087 virtual void run(LLRunner* runner, S64 handle)
00088 {
00089 mPump->clearLock(mKey);
00090 }
00091
00092 protected:
00093 LLChainSleeper(LLPumpIO* pump, S32 key) : mPump(pump), mKey(key) {}
00094 LLPumpIO* mPump;
00095 S32 mKey;
00096 };
00097
00098
00103 struct ll_delete_apr_pollset_fd_client_data
00104 {
00105 typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t;
00106 void operator()(const pipe_conditional_t& conditional)
00107 {
00108 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00109 S32* client_id = (S32*)conditional.second.client_data;
00110 delete client_id;
00111 }
00112 };
00113
00117 LLPumpIO::LLPumpIO(apr_pool_t* pool) :
00118 mState(LLPumpIO::NORMAL),
00119 mRebuildPollset(false),
00120 mPollset(NULL),
00121 mPollsetClientID(0),
00122 mNextLock(0),
00123 mPool(NULL),
00124 mCurrentPool(NULL),
00125 mCurrentPoolReallocCount(0),
00126 mChainsMutex(NULL),
00127 mCallbackMutex(NULL)
00128 {
00129 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00130 initialize(pool);
00131 }
00132
00133 LLPumpIO::~LLPumpIO()
00134 {
00135 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00136 cleanup();
00137 }
00138
00139 bool LLPumpIO::prime(apr_pool_t* pool)
00140 {
00141 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00142 cleanup();
00143 initialize(pool);
00144 return ((pool == NULL) ? false : true);
00145 }
00146
00147 bool LLPumpIO::addChain(const chain_t& chain, F32 timeout)
00148 {
00149 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00150 if(chain.empty()) return false;
00151
00152 #if LL_THREADS_APR
00153 LLScopedLock lock(mChainsMutex);
00154 #endif
00155 LLChainInfo info;
00156 info.setTimeoutSeconds(timeout);
00157 info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray);
00158 LLLinkInfo link;
00159 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00160 lldebugs << "LLPumpIO::addChain() " << chain[0] << " '"
00161 << typeid(*(chain[0])).name() << "'" << llendl;
00162 #else
00163 lldebugs << "LLPumpIO::addChain() " << chain[0] <<llendl;
00164 #endif
00165 chain_t::const_iterator it = chain.begin();
00166 chain_t::const_iterator end = chain.end();
00167 for(; it != end; ++it)
00168 {
00169 link.mPipe = (*it);
00170 link.mChannels = info.mData->nextChannel();
00171 info.mChainLinks.push_back(link);
00172 }
00173 mPendingChains.push_back(info);
00174 return true;
00175 }
00176
00177 bool LLPumpIO::addChain(
00178 const LLPumpIO::links_t& links,
00179 LLIOPipe::buffer_ptr_t data,
00180 LLSD context,
00181 F32 timeout)
00182 {
00183 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00184
00185
00186
00187
00188 if(!data) return false;
00189 if(links.empty()) return false;
00190
00191 #if LL_THREADS_APR
00192 LLScopedLock lock(mChainsMutex);
00193 #endif
00194 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00195 lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << " '"
00196 << typeid(*(links[0].mPipe)).name() << "'" << llendl;
00197 #else
00198 lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << llendl;
00199 #endif
00200 LLChainInfo info;
00201 info.setTimeoutSeconds(timeout);
00202 info.mChainLinks = links;
00203 info.mData = data;
00204 info.mContext = context;
00205 mPendingChains.push_back(info);
00206 return true;
00207 }
00208
00209 bool LLPumpIO::setTimeoutSeconds(F32 timeout)
00210 {
00211
00212 if(current_chain_t() == mCurrentChain)
00213 {
00214 return false;
00215 }
00216 (*mCurrentChain).setTimeoutSeconds(timeout);
00217 return true;
00218 }
00219
00220 bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll)
00221 {
00222 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00223
00224 if(pipe)
00225 {
00226
00227 LLIOPipe::ptr_t pipe_ptr(pipe);
00228 LLChainInfo::conditionals_t::iterator it;
00229 it = (*mCurrentChain).mDescriptors.begin();
00230 while(it != (*mCurrentChain).mDescriptors.end())
00231 {
00232 LLChainInfo::pipe_conditional_t& value = (*it);
00233 if(pipe_ptr == value.first)
00234 {
00235 ll_delete_apr_pollset_fd_client_data()(value);
00236 it = (*mCurrentChain).mDescriptors.erase(it);
00237 mRebuildPollset = true;
00238 }
00239 else
00240 {
00241 ++it;
00242 }
00243 }
00244
00245 if(poll)
00246 {
00247 LLChainInfo::pipe_conditional_t value;
00248 value.first = pipe_ptr;
00249 value.second = *poll;
00250 if(!poll->p)
00251 {
00252
00253
00254
00255 value.second.p = mPool;
00256 }
00257 value.second.client_data = new S32(++mPollsetClientID);
00258 (*mCurrentChain).mDescriptors.push_back(value);
00259 mRebuildPollset = true;
00260 }
00261 return true;
00262 }
00263 return false;
00264 }
00265
00266 S32 LLPumpIO::setLock()
00267 {
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278 if(current_chain_t() == mCurrentChain)
00279 {
00280 return 0;
00281 }
00282
00283
00284 if(++mNextLock <= 0)
00285 {
00286 mNextLock = 1;
00287 }
00288
00289
00290 (*mCurrentChain).mLock = mNextLock;
00291 return mNextLock;
00292 }
00293
00294 void LLPumpIO::clearLock(S32 key)
00295 {
00296
00297
00298
00299
00300
00301
00302 #if LL_THREADS_APR
00303 LLScopedLock lock(mChainsMutex);
00304 #endif
00305 mClearLocks.insert(key);
00306 }
00307
00308 bool LLPumpIO::sleepChain(F64 seconds)
00309 {
00310
00311
00312
00313 if(seconds <= 0.0) return false;
00314 S32 key = setLock();
00315 if(!key) return false;
00316 LLRunner::run_handle_t handle = mRunner.addRunnable(
00317 LLChainSleeper::build(this, key),
00318 LLRunner::RUN_IN,
00319 seconds);
00320 if(0 == handle) return false;
00321 return true;
00322 }
00323
00324 bool LLPumpIO::copyCurrentLinkInfo(links_t& links) const
00325 {
00326 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00327 if(current_chain_t() == mCurrentChain)
00328 {
00329 return false;
00330 }
00331 std::copy(
00332 (*mCurrentChain).mChainLinks.begin(),
00333 (*mCurrentChain).mChainLinks.end(),
00334 std::back_insert_iterator<links_t>(links));
00335 return true;
00336 }
00337
00338 void LLPumpIO::pump()
00339 {
00340 pump(DEFAULT_POLL_TIMEOUT);
00341 }
00342
00343
00344 void LLPumpIO::pump(const S32& poll_timeout)
00345 {
00346 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00347 LLFastTimer t1(LLFastTimer::FTM_PUMP);
00348
00349
00350
00351 mRunner.run();
00352
00353
00354
00355 PUMP_DEBUG;
00356 if(true)
00357 {
00358 #if LL_THREADS_APR
00359 LLScopedLock lock(mChainsMutex);
00360 #endif
00361
00362 if(PAUSING == mState)
00363 {
00364 mState = PAUSED;
00365 }
00366 if(PAUSED == mState)
00367 {
00368 return;
00369 }
00370
00371 PUMP_DEBUG;
00372
00373 if(!mPendingChains.empty())
00374 {
00375 PUMP_DEBUG;
00376
00377 std::copy(
00378 mPendingChains.begin(),
00379 mPendingChains.end(),
00380 std::back_insert_iterator<running_chains_t>(mRunningChains));
00381 mPendingChains.clear();
00382 PUMP_DEBUG;
00383 }
00384
00385
00386
00387 if(!mClearLocks.empty())
00388 {
00389 PUMP_DEBUG;
00390 running_chains_t::iterator it = mRunningChains.begin();
00391 running_chains_t::iterator end = mRunningChains.end();
00392 std::set<S32>::iterator not_cleared = mClearLocks.end();
00393 for(; it != end; ++it)
00394 {
00395 if((*it).mLock && mClearLocks.find((*it).mLock) != not_cleared)
00396 {
00397 (*it).mLock = 0;
00398 }
00399 }
00400 PUMP_DEBUG;
00401 mClearLocks.clear();
00402 }
00403 }
00404
00405 PUMP_DEBUG;
00406
00407 if(mRebuildPollset)
00408 {
00409 PUMP_DEBUG;
00410 rebuildPollset();
00411 mRebuildPollset = false;
00412 }
00413
00414
00415
00416
00417 PUMP_DEBUG;
00418 typedef std::set<S32> signal_client_t;
00419 signal_client_t signalled_client;
00420 if(mPollset)
00421 {
00422 PUMP_DEBUG;
00423
00424 S32 count = 0;
00425 S32 client_id = 0;
00426 const apr_pollfd_t* poll_fd = NULL;
00427 apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd);
00428 PUMP_DEBUG;
00429 for(S32 i = 0; i < count; ++i)
00430 {
00431 client_id = *((S32*)poll_fd[i].client_data);
00432 signalled_client.insert(client_id);
00433 }
00434 PUMP_DEBUG;
00435 }
00436
00437 PUMP_DEBUG;
00438
00439 signal_client_t::iterator not_signalled = signalled_client.end();
00440
00441
00442
00443 running_chains_t::iterator run_chain = mRunningChains.begin();
00444 bool process_this_chain = false;
00445 for(; run_chain != mRunningChains.end(); )
00446 {
00447 PUMP_DEBUG;
00448 if((*run_chain).mInit
00449 && (*run_chain).mTimer.getStarted()
00450 && (*run_chain).mTimer.hasExpired())
00451 {
00452 PUMP_DEBUG;
00453 if(handleChainError(*run_chain, LLIOPipe::STATUS_EXPIRED))
00454 {
00455
00456
00457
00458 if((*run_chain).mTimer.getStarted()
00459 && (*run_chain).mTimer.hasExpired())
00460 {
00461 PUMP_DEBUG;
00462 llinfos << "Error handler forgot to reset timeout. "
00463 << "Resetting to " << DEFAULT_CHAIN_EXPIRY_SECS
00464 << " seconds." << llendl;
00465 (*run_chain).setTimeoutSeconds(DEFAULT_CHAIN_EXPIRY_SECS);
00466 }
00467 }
00468 else
00469 {
00470 PUMP_DEBUG;
00471
00472
00473 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00474 lldebugs << "Removing chain "
00475 << (*run_chain).mChainLinks[0].mPipe
00476 << " '"
00477 << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
00478 << "' because it timed out." << llendl;
00479 #else
00480
00481
00482
00483 #endif
00484 run_chain = mRunningChains.erase(run_chain);
00485 continue;
00486 }
00487 }
00488 PUMP_DEBUG;
00489 if((*run_chain).mLock)
00490 {
00491 ++run_chain;
00492 continue;
00493 }
00494 PUMP_DEBUG;
00495 mCurrentChain = run_chain;
00496 if((*run_chain).mDescriptors.empty())
00497 {
00498
00499 process_this_chain = true;
00500
00501 }
00502 else
00503 {
00504 PUMP_DEBUG;
00505
00506
00507
00508
00509 process_this_chain = false;
00510 if(!signalled_client.empty())
00511 {
00512 PUMP_DEBUG;
00513 LLChainInfo::conditionals_t::iterator it;
00514 it = (*run_chain).mDescriptors.begin();
00515 LLChainInfo::conditionals_t::iterator end;
00516 end = (*run_chain).mDescriptors.end();
00517 S32 client_id = 0;
00518 for(; it != end; ++it)
00519 {
00520 PUMP_DEBUG;
00521 client_id = *((S32*)((*it).second.client_data));
00522 if(signalled_client.find(client_id) != not_signalled)
00523 {
00524 process_this_chain = true;
00525 break;
00526 }
00527
00528 }
00529 }
00530 }
00531 if(process_this_chain)
00532 {
00533 PUMP_DEBUG;
00534 if(!((*run_chain).mInit))
00535 {
00536 (*run_chain).mHead = (*run_chain).mChainLinks.begin();
00537 (*run_chain).mInit = true;
00538 }
00539 PUMP_DEBUG;
00540 processChain(*run_chain);
00541 }
00542
00543 PUMP_DEBUG;
00544 if((*run_chain).mHead == (*run_chain).mChainLinks.end())
00545 {
00546 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00547 lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
00548 << " '"
00549 << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
00550 << "' because we reached the end." << llendl;
00551 #else
00552
00553
00554 #endif
00555
00556 PUMP_DEBUG;
00557
00558
00559 std::for_each(
00560 (*run_chain).mDescriptors.begin(),
00561 (*run_chain).mDescriptors.end(),
00562 ll_delete_apr_pollset_fd_client_data());
00563 run_chain = mRunningChains.erase(run_chain);
00564
00565
00566 mRebuildPollset = true;
00567 }
00568 else
00569 {
00570 PUMP_DEBUG;
00571
00572
00573 ++run_chain;
00574 }
00575 }
00576
00577 PUMP_DEBUG;
00578
00579 mCurrentChain = current_chain_t();
00580 END_PUMP_DEBUG;
00581 }
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595 bool LLPumpIO::respond(LLIOPipe* pipe)
00596 {
00597 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00598 if(NULL == pipe) return false;
00599
00600 #if LL_THREADS_APR
00601 LLScopedLock lock(mCallbackMutex);
00602 #endif
00603 LLChainInfo info;
00604 LLLinkInfo link;
00605 link.mPipe = pipe;
00606 info.mChainLinks.push_back(link);
00607 mPendingCallbacks.push_back(info);
00608 return true;
00609 }
00610
00611 bool LLPumpIO::respond(
00612 const links_t& links,
00613 LLIOPipe::buffer_ptr_t data,
00614 LLSD context)
00615 {
00616 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00617
00618
00619 if(!data) return false;
00620 if(links.empty()) return false;
00621
00622 #if LL_THREADS_APR
00623 LLScopedLock lock(mCallbackMutex);
00624 #endif
00625
00626
00627 LLChainInfo info;
00628 info.mChainLinks = links;
00629 info.mData = data;
00630 info.mContext = context;
00631 mPendingCallbacks.push_back(info);
00632 return true;
00633 }
00634
00635 void LLPumpIO::callback()
00636 {
00637 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00638
00639 if(true)
00640 {
00641 #if LL_THREADS_APR
00642 LLScopedLock lock(mCallbackMutex);
00643 #endif
00644 std::copy(
00645 mPendingCallbacks.begin(),
00646 mPendingCallbacks.end(),
00647 std::back_insert_iterator<callbacks_t>(mCallbacks));
00648 mPendingCallbacks.clear();
00649 }
00650 if(!mCallbacks.empty())
00651 {
00652 callbacks_t::iterator it = mCallbacks.begin();
00653 callbacks_t::iterator end = mCallbacks.end();
00654 for(; it != end; ++it)
00655 {
00656
00657 (*it).mHead = (*it).mChainLinks.begin();
00658 (*it).mInit = true;
00659 (*it).mEOS = true;
00660 processChain(*it);
00661 }
00662 mCallbacks.clear();
00663 }
00664 }
00665
00666 void LLPumpIO::control(LLPumpIO::EControl op)
00667 {
00668 #if LL_THREADS_APR
00669 LLScopedLock lock(mChainsMutex);
00670 #endif
00671 switch(op)
00672 {
00673 case PAUSE:
00674 mState = PAUSING;
00675 break;
00676 case RESUME:
00677 mState = NORMAL;
00678 break;
00679 default:
00680
00681 break;
00682 }
00683 }
00684
00685 void LLPumpIO::initialize(apr_pool_t* pool)
00686 {
00687 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00688 if(!pool) return;
00689 #if LL_THREADS_APR
00690
00691 apr_thread_mutex_create(&mChainsMutex, APR_THREAD_MUTEX_UNNESTED, pool);
00692 apr_thread_mutex_create(&mCallbackMutex, APR_THREAD_MUTEX_UNNESTED, pool);
00693 #endif
00694 mPool = pool;
00695 }
00696
00697 void LLPumpIO::cleanup()
00698 {
00699 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00700 #if LL_THREADS_APR
00701 if(mChainsMutex) apr_thread_mutex_destroy(mChainsMutex);
00702 if(mCallbackMutex) apr_thread_mutex_destroy(mCallbackMutex);
00703 #endif
00704 mChainsMutex = NULL;
00705 mCallbackMutex = NULL;
00706 if(mPollset)
00707 {
00708
00709 apr_pollset_destroy(mPollset);
00710 mPollset = NULL;
00711 }
00712 if(mCurrentPool)
00713 {
00714 apr_pool_destroy(mCurrentPool);
00715 mCurrentPool = NULL;
00716 }
00717 mPool = NULL;
00718 }
00719
00720 void LLPumpIO::rebuildPollset()
00721 {
00722 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00723
00724 if(mPollset)
00725 {
00726
00727 apr_pollset_destroy(mPollset);
00728 mPollset = NULL;
00729 }
00730 U32 size = 0;
00731 running_chains_t::iterator run_it = mRunningChains.begin();
00732 running_chains_t::iterator run_end = mRunningChains.end();
00733 for(; run_it != run_end; ++run_it)
00734 {
00735 size += (*run_it).mDescriptors.size();
00736 }
00737
00738 if(size)
00739 {
00740
00741 const S32 POLLSET_POOL_RECYCLE_COUNT = 100;
00742 if(mCurrentPool
00743 && (0 == (++mCurrentPoolReallocCount % POLLSET_POOL_RECYCLE_COUNT)))
00744 {
00745 apr_pool_destroy(mCurrentPool);
00746 mCurrentPool = NULL;
00747 mCurrentPoolReallocCount = 0;
00748 }
00749 if(!mCurrentPool)
00750 {
00751 apr_status_t status = apr_pool_create(&mCurrentPool, mPool);
00752 (void)ll_apr_warn_status(status);
00753 }
00754
00755
00756 run_it = mRunningChains.begin();
00757 LLChainInfo::conditionals_t::iterator fd_it;
00758 LLChainInfo::conditionals_t::iterator fd_end;
00759 apr_pollset_create(&mPollset, size, mCurrentPool, 0);
00760 for(; run_it != run_end; ++run_it)
00761 {
00762 fd_it = (*run_it).mDescriptors.begin();
00763 fd_end = (*run_it).mDescriptors.end();
00764 for(; fd_it != fd_end; ++fd_it)
00765 {
00766 apr_pollset_add(mPollset, &((*fd_it).second));
00767 }
00768 }
00769 }
00770 }
00771
00772 void LLPumpIO::processChain(LLChainInfo& chain)
00773 {
00774 PUMP_DEBUG;
00775 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00776 LLIOPipe::EStatus status = LLIOPipe::STATUS_OK;
00777 links_t::iterator it = chain.mHead;
00778 links_t::iterator end = chain.mChainLinks.end();
00779 bool need_process_signaled = false;
00780 bool keep_going = true;
00781 do
00782 {
00783 #if LL_DEBUG_PROCESS_LINK
00784 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00785 llinfos << "Processing " << typeid(*((*it).mPipe)).name() << "."
00786 << llendl;
00787 #else
00788 llinfos << "Processing link " << (*it).mPipe << "." << llendl;
00789 #endif
00790 #endif
00791 #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN
00792 if(chain.mData)
00793 {
00794 char* buf = NULL;
00795 S32 bytes = chain.mData->countAfter((*it).mChannels.in(), NULL);
00796 if(bytes)
00797 {
00798 buf = new char[bytes + 1];
00799 chain.mData->readAfter(
00800 (*it).mChannels.in(),
00801 NULL,
00802 (U8*)buf,
00803 bytes);
00804 buf[bytes] = '\0';
00805 llinfos << "CHANNEL IN(" << (*it).mChannels.in() << "): "
00806 << buf << llendl;
00807 delete[] buf;
00808 buf = NULL;
00809 }
00810 else
00811 {
00812 llinfos << "CHANNEL IN(" << (*it).mChannels.in()<< "): (null)"
00813 << llendl;
00814 }
00815 }
00816 #endif
00817 PUMP_DEBUG;
00818 status = (*it).mPipe->process(
00819 (*it).mChannels,
00820 chain.mData,
00821 chain.mEOS,
00822 chain.mContext,
00823 this);
00824 #if LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT
00825 if(chain.mData)
00826 {
00827 char* buf = NULL;
00828 S32 bytes = chain.mData->countAfter((*it).mChannels.out(), NULL);
00829 if(bytes)
00830 {
00831 buf = new char[bytes + 1];
00832 chain.mData->readAfter(
00833 (*it).mChannels.out(),
00834 NULL,
00835 (U8*)buf,
00836 bytes);
00837 buf[bytes] = '\0';
00838 llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): "
00839 << buf << llendl;
00840 delete[] buf;
00841 buf = NULL;
00842 }
00843 else
00844 {
00845 llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): (null)"
00846 << llendl;
00847 }
00848 }
00849 #endif
00850
00851 #if LL_DEBUG_PROCESS_RETURN_VALUE
00852
00853
00854 if(LLIOPipe::isSuccess(status))
00855 {
00856 llinfos << "Pipe returned: '"
00857 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00858 << typeid(*((*it).mPipe)).name() << "':'"
00859 #endif
00860 << LLIOPipe::lookupStatusString(status) << "'" << llendl;
00861 }
00862 #endif
00863
00864 PUMP_DEBUG;
00865 switch(status)
00866 {
00867 case LLIOPipe::STATUS_OK:
00868
00869 break;
00870 case LLIOPipe::STATUS_STOP:
00871 PUMP_DEBUG;
00872 status = LLIOPipe::STATUS_OK;
00873 chain.mHead = end;
00874 keep_going = false;
00875 break;
00876 case LLIOPipe::STATUS_DONE:
00877 PUMP_DEBUG;
00878 status = LLIOPipe::STATUS_OK;
00879 chain.mHead = (it + 1);
00880 chain.mEOS = true;
00881 break;
00882 case LLIOPipe::STATUS_BREAK:
00883 PUMP_DEBUG;
00884 status = LLIOPipe::STATUS_OK;
00885 keep_going = false;
00886 break;
00887 case LLIOPipe::STATUS_NEED_PROCESS:
00888 PUMP_DEBUG;
00889 status = LLIOPipe::STATUS_OK;
00890 if(!need_process_signaled)
00891 {
00892 need_process_signaled = true;
00893 chain.mHead = it;
00894 }
00895 break;
00896 default:
00897 PUMP_DEBUG;
00898 if(LLIOPipe::isError(status))
00899 {
00900 llinfos << "Pump generated pipe err: '"
00901 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00902 << typeid(*((*it).mPipe)).name() << "':'"
00903 #endif
00904 << LLIOPipe::lookupStatusString(status)
00905 << "'" << llendl;
00906 #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR
00907 if(chain.mData)
00908 {
00909 char* buf = NULL;
00910 S32 bytes = chain.mData->countAfter(
00911 (*it).mChannels.in(),
00912 NULL);
00913 if(bytes)
00914 {
00915 buf = new char[bytes + 1];
00916 chain.mData->readAfter(
00917 (*it).mChannels.in(),
00918 NULL,
00919 (U8*)buf,
00920 bytes);
00921 buf[bytes] = '\0';
00922 llinfos << "Input After Error: " << buf << llendl;
00923 delete[] buf;
00924 buf = NULL;
00925 }
00926 else
00927 {
00928 llinfos << "Input After Error: (null)" << llendl;
00929 }
00930 }
00931 else
00932 {
00933 llinfos << "Input After Error: (null)" << llendl;
00934 }
00935 #endif
00936 keep_going = false;
00937 chain.mHead = it;
00938 if(!handleChainError(chain, status))
00939 {
00940 chain.mHead = end;
00941 }
00942 }
00943 else
00944 {
00945 llinfos << "Unhandled status code: " << status << ":"
00946 << LLIOPipe::lookupStatusString(status) << llendl;
00947 }
00948 break;
00949 }
00950 PUMP_DEBUG;
00951 } while(keep_going && (++it != end));
00952 PUMP_DEBUG;
00953 }
00954
00955 bool LLPumpIO::handleChainError(
00956 LLChainInfo& chain,
00957 LLIOPipe::EStatus error)
00958 {
00959 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
00960 links_t::reverse_iterator rit;
00961 if(chain.mHead == chain.mChainLinks.end())
00962 {
00963 rit = links_t::reverse_iterator(chain.mHead);
00964 }
00965 else
00966 {
00967 rit = links_t::reverse_iterator(chain.mHead + 1);
00968 }
00969
00970 links_t::reverse_iterator rend = chain.mChainLinks.rend();
00971 bool handled = false;
00972 bool keep_going = true;
00973 do
00974 {
00975 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00976 lldebugs << "Passing error to " << typeid(*((*rit).mPipe)).name()
00977 << "." << llendl;
00978 #endif
00979 error = (*rit).mPipe->handleError(error, this);
00980 switch(error)
00981 {
00982 case LLIOPipe::STATUS_OK:
00983 handled = true;
00984 chain.mHead = rit.base();
00985 break;
00986 case LLIOPipe::STATUS_STOP:
00987 case LLIOPipe::STATUS_DONE:
00988 case LLIOPipe::STATUS_BREAK:
00989 case LLIOPipe::STATUS_NEED_PROCESS:
00990 #if LL_DEBUG_PIPE_TYPE_IN_PUMP
00991 lldebugs << "Pipe " << typeid(*((*rit).mPipe)).name()
00992 << " returned code to stop error handler." << llendl;
00993 #endif
00994 keep_going = false;
00995 break;
00996 default:
00997 if(LLIOPipe::isSuccess(error))
00998 {
00999 llinfos << "Unhandled status code: " << error << ":"
01000 << LLIOPipe::lookupStatusString(error) << llendl;
01001 error = LLIOPipe::STATUS_ERROR;
01002 keep_going = false;
01003 }
01004 break;
01005 }
01006 } while(keep_going && !handled && (++rit != rend));
01007 return handled;
01008 }
01009
01014 LLPumpIO::LLChainInfo::LLChainInfo() :
01015 mInit(false),
01016 mLock(0),
01017 mEOS(false)
01018 {
01019 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
01020 mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS);
01021 }
01022
01023 void LLPumpIO::LLChainInfo::setTimeoutSeconds(F32 timeout)
01024 {
01025 LLMemType m1(LLMemType::MTYPE_IO_PUMP);
01026 if(timeout > 0.0f)
01027 {
01028 mTimer.start();
01029 mTimer.reset();
01030 mTimer.setTimerExpirySec(timeout);
01031 }
01032 else
01033 {
01034 mTimer.stop();
01035 }
01036 }