#include <llpumpio.h>
Public Types | |
enum | EControl { PAUSE, RESUME } |
Enumeration to send commands to the pump. More... | |
typedef std::vector < LLIOPipe::ptr_t > | chain_t |
Typedef for having a chain of pipes. | |
typedef std::vector< LLLinkInfo > | links_t |
Typedef for having a chain of LLLinkInfo instances. | |
Public Member Functions | |
LLPumpIO (apr_pool_t *pool) | |
Constructor. | |
~LLPumpIO () | |
Destructor. | |
bool | prime (apr_pool_t *pool) |
Prepare this pump for usage. | |
bool | addChain (const chain_t &chain, F32 timeout) |
Add a chain to this pump and process in the next cycle. | |
bool | addChain (const links_t &links, LLIOPipe::buffer_ptr_t data, LLSD context, F32 timeout) |
Add a chain to this pump and process in the next cycle. | |
bool | setTimeoutSeconds (F32 timeout) |
Set or clear a timeout for the running chain. | |
bool | setConditional (LLIOPipe *pipe, const apr_pollfd_t *poll) |
Set up file descriptors for for the running chain. | |
S32 | setLock () |
Lock the current chain. | |
void | clearLock (S32 key) |
Clears the identified lock. | |
bool | sleepChain (F64 seconds) |
Stop processing a chain for a while. | |
bool | copyCurrentLinkInfo (links_t &links) const |
Copy the currently running chain link info. | |
void | pump (const S32 &poll_timeout) |
Call this method to call process on all running chains. | |
void | pump () |
bool | respond (LLIOPipe *pipe) |
Add a chain to a special queue which will be called during the next call to callback() and then dropped from the queue. | |
bool | respond (const links_t &links, LLIOPipe::buffer_ptr_t data, LLSD context) |
Add a chain to a special queue which will be called during the next call to callback() and then dropped from the queue. | |
void | callback () |
Run through the callback queue and call process() . | |
void | control (EControl op) |
Send a command to the pump. | |
running_chains_t::size_type | runningChains () const |
Return number of running chains. | |
Protected Types | |
enum | EState { NORMAL, PAUSING, PAUSED } |
State of the pump. More... | |
typedef std::vector< LLChainInfo > | pending_chains_t |
typedef std::list< LLChainInfo > | running_chains_t |
typedef running_chains_t::iterator | current_chain_t |
typedef std::vector< LLChainInfo > | callbacks_t |
Protected Member Functions | |
void | initialize (apr_pool_t *pool) |
void | cleanup () |
void | rebuildPollset () |
Given the internal state of the chains, rebuild the pollset. | |
void | processChain (LLChainInfo &chain) |
Process the chain passed in. | |
bool | handleChainError (LLChainInfo &chain, LLIOPipe::EStatus error) |
Rewind through the chain to try to recover from an error. | |
Protected Attributes | |
EState | mState |
bool | mRebuildPollset |
apr_pollset_t * | mPollset |
S32 | mPollsetClientID |
S32 | mNextLock |
std::set< S32 > | mClearLocks |
LLRunner | mRunner |
pending_chains_t | mPendingChains |
running_chains_t | mRunningChains |
current_chain_t | mCurrentChain |
callbacks_t | mPendingCallbacks |
callbacks_t | mCallbacks |
apr_pool_t * | mPool |
apr_pool_t * | mCurrentPool |
S32 | mCurrentPoolReallocCount |
int * | mChainsMutex |
int * | mCallbackMutex |
Classes | |
struct | LLChainInfo |
struct | LLLinkInfo |
Struct to associate a pipe with it's buffer io indexes. More... |
The pump class provides a thread abstraction for doing IO based communication between two threads in a structured and optimized for processor time. The primary usage is to create a pump, and call pump()
on a thread used for IO and call respond()
on a thread that is expected to do higher level processing. You can call almost any other method from any thread - see notes for each method for details. In order for the threading abstraction to work, you need to call prime()
with a valid apr pool. A pump instance manages much of the state for the pipe, including the list of pipes in the chain, the channel for each element in the chain, the buffer, and if any pipe has marked the stream or process as done. Pipes can also set file descriptor based conditional statements so that calls to process do not happen until data is ready to be read or written. Pipes control execution of calls to process by returning a status code such as STATUS_OK or STATUS_BREAK. One way to conceptualize the way IO will work is that a pump combines the unit processing of pipes to behave like file pipes on the unix command line.
Definition at line 81 of file llpumpio.h.
typedef std::vector<LLIOPipe::ptr_t> LLPumpIO::chain_t |
typedef std::vector<LLLinkInfo> LLPumpIO::links_t |
typedef std::vector<LLChainInfo> LLPumpIO::pending_chains_t [protected] |
Definition at line 370 of file llpumpio.h.
typedef std::list<LLChainInfo> LLPumpIO::running_chains_t [protected] |
Definition at line 372 of file llpumpio.h.
typedef running_chains_t::iterator LLPumpIO::current_chain_t [protected] |
Definition at line 375 of file llpumpio.h.
typedef std::vector<LLChainInfo> LLPumpIO::callbacks_t [protected] |
Definition at line 381 of file llpumpio.h.
enum LLPumpIO::EControl |
enum LLPumpIO::EState [protected] |
LLPumpIO::LLPumpIO | ( | apr_pool_t * | pool | ) |
Constructor.
Definition at line 169 of file llpumpio.cpp.
References initialize(), and LLMemType::MTYPE_IO_PUMP.
LLPumpIO::~LLPumpIO | ( | ) |
Destructor.
Definition at line 185 of file llpumpio.cpp.
References cleanup(), and LLMemType::MTYPE_IO_PUMP.
bool LLPumpIO::prime | ( | apr_pool_t * | pool | ) |
Prepare this pump for usage.
If you fail to call this method prior to use, the pump will try to work, but will not come with any thread locking mechanisms.
pool | The apr pool to use. |
Definition at line 191 of file llpumpio.cpp.
References cleanup(), initialize(), LLMemType::MTYPE_IO_PUMP, and NULL.
Referenced by LLCrashLogger::init().
Add a chain to this pump and process in the next cycle.
This method will automatically generate a buffer and assign each link in the chain as if it were the consumer to the previous.
chain | The pipes for the chain | |
timeout | The number of seconds in the future to expire. Pass in 0.0f to never expire. |
Definition at line 199 of file llpumpio.cpp.
References lldebugs, llendl, LLPumpIO::LLChainInfo::mChainLinks, mChainsMutex, LLPumpIO::LLLinkInfo::mChannels, LLPumpIO::LLChainInfo::mData, mPendingChains, LLPumpIO::LLLinkInfo::mPipe, LLMemType::MTYPE_IO_PUMP, and LLPumpIO::LLChainInfo::setTimeoutSeconds().
Referenced by LLDeferredChain::addToPump(), LLIOHTTPServer::create(), tut::HTTPServiceTestData::makeRequest(), LLSDRPCClient::process_impl(), LLIOAddChain::process_impl(), LLIOServerSocket::process_impl(), LLHTTPResponder::process_impl(), tut::rpc_server_data::pump_loop(), request(), and LLVoiceClient::stateMachine().
bool LLPumpIO::addChain | ( | const links_t & | links, | |
LLIOPipe::buffer_ptr_t | data, | |||
LLSD | context, | |||
F32 | timeout | |||
) |
Add a chain to this pump and process in the next cycle.
This method provides a slightly more sophisticated method for adding a chain where the caller can specify which link elements are on what channels. This method will fail if no buffer is provided since any calls to generate new channels for the buffers will cause unpredictable interleaving of data.
links | The pipes and io indexes for the chain | |
data | Shared pointer to data buffer | |
context | Potentially undefined context meta-data for chain. | |
timeout | The number of seconds in the future to expire. Pass in 0.0f to never expire. |
Definition at line 229 of file llpumpio.cpp.
References lldebugs, llendl, LLPumpIO::LLChainInfo::mChainLinks, mChainsMutex, LLPumpIO::LLChainInfo::mContext, LLPumpIO::LLChainInfo::mData, mPendingChains, LLMemType::MTYPE_IO_PUMP, and LLPumpIO::LLChainInfo::setTimeoutSeconds().
bool LLPumpIO::setTimeoutSeconds | ( | F32 | timeout | ) |
Set or clear a timeout for the running chain.
timeout | The number of seconds in the future to expire. Pass in 0.0f to never expire. |
Definition at line 261 of file llpumpio.cpp.
References mCurrentChain.
bool LLPumpIO::setConditional | ( | LLIOPipe * | pipe, | |
const apr_pollfd_t * | poll | |||
) |
Set up file descriptors for for the running chain.
process()
rather than the pump which exposed this interface. pipe | The pipe which is setting a conditional | |
poll | The entire socket and read/write condition - null to remove |
Definition at line 302 of file llpumpio.cpp.
References events_2_string(), ll_debug_poll_fd(), lldebugs, llendl, mPollsetClientID, mPool, mRebuildPollset, LLMemType::MTYPE_IO_PUMP, and S32.
Referenced by LLIOServerSocket::process_impl(), LLIOSocketWriter::process_impl(), and LLIOSocketReader::process_impl().
S32 LLPumpIO::setLock | ( | ) |
Lock the current chain.
process()
until you call clearLock()
with the lock identifier. *FIX: Given the structure of the pump and pipe relationship, this should probably go through a different mechanism than the pump. I think it would be best if the pipe had some kind of controller which was passed into process()
rather than the pump which exposed this interface. clearLock()
or 0 on failure. Definition at line 356 of file llpumpio.cpp.
References mCurrentChain, and mNextLock.
Referenced by LLHTTPPipe::lockChain(), LLSDRPCServer::process_impl(), and sleepChain().
void LLPumpIO::clearLock | ( | S32 | key | ) |
Clears the identified lock.
links | A container for the links which will be appended |
Definition at line 384 of file llpumpio.cpp.
References mChainsMutex, and mClearLocks.
Referenced by LLSDRPCServer::clearLock(), LLSDRPCServer::process_impl(), LLChainSleeper::run(), and LLHTTPPipe::unlockChain().
bool LLPumpIO::sleepChain | ( | F64 | seconds | ) |
Stop processing a chain for a while.
seconds | The number of seconds in the future to resume processing. |
Definition at line 398 of file llpumpio.cpp.
References LLRunner::addRunnable(), LLChainSleeper::build(), indra::ipc::xml_rpc::handle(), mRunner, LLRunner::RUN_IN, S32, and setLock().
Referenced by LLIOSleeper::process_impl(), and LLIOSleep::process_impl().
bool LLPumpIO::copyCurrentLinkInfo | ( | links_t & | links | ) | const |
Copy the currently running chain link info.
*FIX: Given the structure of the pump and pipe relationship, this should probably go through a different mechanism than the pump. I think it would be best if the pipe had some kind of controller which was passed into process()
rather than the pump which exposed this interface.
links | A container for the links which will be appended |
Definition at line 414 of file llpumpio.cpp.
References mCurrentChain, and LLMemType::MTYPE_IO_PUMP.
Referenced by LLHTTPResponder::process_impl().
void LLPumpIO::pump | ( | const S32 & | poll_timeout | ) |
Call this method to call process on all running chains.
This method iterates through the running chains, and if all pipe on a chain are unconditionally ready or if any pipe has any conditional processiong condition then process will be called on every chain which has requested processing. that chain has a file descriptor ready, process()
will be called for all pipes which have requested it.
Definition at line 434 of file llpumpio.cpp.
References DEFAULT_CHAIN_EXPIRY_SECS, END_PUMP_DEBUG, events_2_string(), LLFastTimer::FTM_PUMP, handleChainError(), ll_debug_poll_fd(), lldebugs, llendl, llinfos, llwarns, mChainsMutex, mClearLocks, mCurrentChain, mPendingChains, mPollset, mRebuildPollset, mRunner, mRunningChains, mState, LLMemType::MTYPE_IO_PUMP, NULL, PAUSED, PAUSING, processChain(), PUMP_DEBUG, rebuildPollset(), LLRunner::run(), S32, LLIOPipe::STATUS_ERROR, LLIOPipe::STATUS_EXPIRED, and LLIOPipe::STATUS_LOST_CONNECTION.
Referenced by LLMessageSystem::checkAllMessages(), LLAppViewer::mainLoop(), pump_loop(), tut::rpc_server_data::pump_loop(), tut::HTTPServiceTestData::pumpPipe(), tut::HTTPClientTestData::runThePump(), and LLCrashLogger::updateApplication().
void LLPumpIO::pump | ( | ) |
bool LLPumpIO::respond | ( | LLIOPipe * | pipe | ) |
Add a chain to a special queue which will be called during the next call to callback()
and then dropped from the queue.
chain | The IO chain that will get one process() . Add pipe to a special queue which will be called during the next call to callback() and then dropped from the queue. This call will add a single pipe, with no buffer, context, or channel information to the callback queue. It will be called once, and then dropped. | |
pipe | A single io pipe which will be called |
Definition at line 721 of file llpumpio.cpp.
References mCallbackMutex, LLPumpIO::LLChainInfo::mChainLinks, mPendingCallbacks, LLPumpIO::LLLinkInfo::mPipe, LLMemType::MTYPE_IO_PUMP, and NULL.
Referenced by LLURLRequest::handleError(), LLURLRequest::process_impl(), LLSDRPCServer::process_impl(), and LLSDRPCClient::process_impl().
bool LLPumpIO::respond | ( | const links_t & | links, | |
LLIOPipe::buffer_ptr_t | data, | |||
LLSD | context | |||
) |
Add a chain to a special queue which will be called during the next call to callback()
and then dropped from the queue.
It is important to remember that you should not add a data buffer or context which may still be in another chain - that will almost certainly lead to a problems. Ensure that you are done reading and writing to those parameters, have new generated, or empty pointers.
links | The pipes and io indexes for the chain | |
data | Shared pointer to data buffer | |
context | Potentially undefined context meta-data for chain. |
Definition at line 737 of file llpumpio.cpp.
References mCallbackMutex, LLPumpIO::LLChainInfo::mChainLinks, LLPumpIO::LLChainInfo::mContext, LLPumpIO::LLChainInfo::mData, mPendingCallbacks, and LLMemType::MTYPE_IO_PUMP.
void LLPumpIO::callback | ( | ) |
Run through the callback queue and call process()
.
This call will process all prending responses and call process on each. This method will then drop all processed callback requests which may lead to deleting the referenced objects.
Definition at line 761 of file llpumpio.cpp.
References mCallbackMutex, mCallbacks, mPendingCallbacks, LLMemType::MTYPE_IO_PUMP, and processChain().
Referenced by LLMessageSystem::checkAllMessages(), LLAppViewer::mainLoop(), pump_loop(), tut::rpc_server_data::pump_loop(), tut::HTTPServiceTestData::pumpPipe(), tut::HTTPClientTestData::runThePump(), and LLCrashLogger::updateApplication().
void LLPumpIO::control | ( | LLPumpIO::EControl | op | ) |
Send a command to the pump.
op | What control to send to the pump. |
Definition at line 792 of file llpumpio.cpp.
References mChainsMutex, mState, NORMAL, PAUSE, PAUSING, and RESUME.
void LLPumpIO::initialize | ( | apr_pool_t * | pool | ) | [protected] |
Definition at line 811 of file llpumpio.cpp.
References mCallbackMutex, mChainsMutex, mPool, and LLMemType::MTYPE_IO_PUMP.
Referenced by LLPumpIO(), and prime().
void LLPumpIO::cleanup | ( | ) | [protected] |
Definition at line 823 of file llpumpio.cpp.
References mCallbackMutex, mChainsMutex, mCurrentPool, mPollset, mPool, LLMemType::MTYPE_IO_PUMP, and NULL.
Referenced by prime(), and ~LLPumpIO().
void LLPumpIO::rebuildPollset | ( | ) | [protected] |
Given the internal state of the chains, rebuild the pollset.
Definition at line 846 of file llpumpio.cpp.
References ll_apr_warn_status(), mCurrentPool, mCurrentPoolReallocCount, mPollset, mPool, mRunningChains, LLMemType::MTYPE_IO_PUMP, NULL, S32, and indra::ipc::saranwrap::status().
Referenced by pump().
void LLPumpIO::processChain | ( | LLChainInfo & | chain | ) | [protected] |
Process the chain passed in.
This method will potentially modify the internals of the chain. On end, the chain.mHead will equal chain.mChainLinks.end().
chain | The LLChainInfo object to work on. |
Definition at line 898 of file llpumpio.cpp.
References handleChainError(), LLIOPipe::isError(), LLIOPipe::isSuccess(), llendl, llinfos, LLIOPipe::lookupStatusString(), LLPumpIO::LLChainInfo::mChainLinks, LLPumpIO::LLChainInfo::mContext, LLPumpIO::LLChainInfo::mData, LLPumpIO::LLChainInfo::mEOS, LLPumpIO::LLChainInfo::mHead, LLMemType::MTYPE_IO_PUMP, NULL, PUMP_DEBUG, S32, indra::ipc::saranwrap::status(), LLIOPipe::STATUS_BREAK, LLIOPipe::STATUS_DONE, LLIOPipe::STATUS_NEED_PROCESS, LLIOPipe::STATUS_OK, and LLIOPipe::STATUS_STOP.
Referenced by callback(), and pump().
bool LLPumpIO::handleChainError | ( | LLChainInfo & | chain, | |
LLIOPipe::EStatus | error | |||
) | [protected] |
Rewind through the chain to try to recover from an error.
This method will potentially modify the internals of the chain.
chain | The LLChainInfo object to work on. |
Definition at line 1081 of file llpumpio.cpp.
References LLIOPipe::isSuccess(), lldebugs, llendl, llinfos, LLIOPipe::lookupStatusString(), LLPumpIO::LLChainInfo::mChainLinks, LLPumpIO::LLChainInfo::mHead, LLMemType::MTYPE_IO_PUMP, LLIOPipe::STATUS_BREAK, LLIOPipe::STATUS_DONE, LLIOPipe::STATUS_ERROR, LLIOPipe::STATUS_NEED_PROCESS, LLIOPipe::STATUS_OK, and LLIOPipe::STATUS_STOP.
Referenced by processChain(), and pump().
running_chains_t::size_type LLPumpIO::runningChains | ( | ) | const [inline] |
Return number of running chains.
*NOTE: This is only used in debugging and not considered efficient or safe enough for production use.
Definition at line 435 of file llpumpio.h.
References mRunningChains.
EState LLPumpIO::mState [protected] |
bool LLPumpIO::mRebuildPollset [protected] |
apr_pollset_t* LLPumpIO::mPollset [protected] |
S32 LLPumpIO::mPollsetClientID [protected] |
S32 LLPumpIO::mNextLock [protected] |
std::set<S32> LLPumpIO::mClearLocks [protected] |
LLRunner LLPumpIO::mRunner [protected] |
pending_chains_t LLPumpIO::mPendingChains [protected] |
running_chains_t LLPumpIO::mRunningChains [protected] |
Definition at line 373 of file llpumpio.h.
Referenced by pump(), rebuildPollset(), and runningChains().
current_chain_t LLPumpIO::mCurrentChain [protected] |
Definition at line 376 of file llpumpio.h.
Referenced by copyCurrentLinkInfo(), pump(), setLock(), and setTimeoutSeconds().
callbacks_t LLPumpIO::mPendingCallbacks [protected] |
callbacks_t LLPumpIO::mCallbacks [protected] |
apr_pool_t* LLPumpIO::mPool [protected] |
Definition at line 386 of file llpumpio.h.
Referenced by cleanup(), initialize(), rebuildPollset(), and setConditional().
apr_pool_t* LLPumpIO::mCurrentPool [protected] |
S32 LLPumpIO::mCurrentPoolReallocCount [protected] |
int* LLPumpIO::mChainsMutex [protected] |
Definition at line 394 of file llpumpio.h.
Referenced by addChain(), cleanup(), clearLock(), control(), initialize(), and pump().
int* LLPumpIO::mCallbackMutex [protected] |
Definition at line 395 of file llpumpio.h.
Referenced by callback(), cleanup(), initialize(), and respond().