#include <llpumpio.h>
Collaboration diagram for LLPumpIO:
Public Types | |
typedef std::vector< LLIOPipe::ptr_t > | chain_t |
Typedef for having a chain of pipes. | |
typedef std::vector< LLLinkInfo > | links_t |
Typedef for having a chain of LLLinkInfo instances. | |
PAUSE | |
RESUME | |
enum | EControl { PAUSE, RESUME } |
Enumeration to send commands to the pump. More... | |
Public Member Functions | |
LLPumpIO (apr_pool_t *pool) | |
Constructor. | |
~LLPumpIO () | |
Destructor. | |
bool | prime (apr_pool_t *pool) |
Prepare this pump for usage. | |
bool | addChain (const chain_t &chain, F32 timeout) |
Add a chain to this pump and process in the next cycle. | |
bool | addChain (const links_t &links, LLIOPipe::buffer_ptr_t data, LLSD context, F32 timeout) |
Add a chain to this pump and process in the next cycle. | |
bool | setTimeoutSeconds (F32 timeout) |
Set or clear a timeout for the running chain. | |
bool | setConditional (LLIOPipe *pipe, const apr_pollfd_t *poll) |
Set up file descriptors for for the running chain. | |
S32 | setLock () |
Lock the current chain. | |
void | clearLock (S32 key) |
Clears the identified lock. | |
bool | sleepChain (F64 seconds) |
Stop processing a chain for a while. | |
bool | copyCurrentLinkInfo (links_t &links) const |
Copy the currently running chain link info. | |
void | pump (const S32 &poll_timeout) |
Call this method to call process on all running chains. | |
void | pump () |
bool | respond (LLIOPipe *pipe) |
Add a chain to a special queue which will be called during the next call to callback() and then dropped from the queue.Add pipe to a special queue which will be called during the next call to callback() and then dropped from the queue. | |
bool | respond (const links_t &links, LLIOPipe::buffer_ptr_t data, LLSD context) |
Add a chain to a special queue which will be called during the next call to callback() and then dropped from the queue. | |
void | callback () |
Run through the callback queue and call process() . | |
void | control (EControl op) |
Send a command to the pump. | |
Protected Types | |
typedef std::vector< LLChainInfo > | pending_chains_t |
typedef std::list< LLChainInfo > | running_chains_t |
typedef running_chains_t::iterator | current_chain_t |
typedef std::vector< LLChainInfo > | callbacks_t |
NORMAL | |
PAUSING | |
PAUSED | |
enum | EState { NORMAL, PAUSING, PAUSED } |
State of the pump. More... | |
Protected Member Functions | |
void | initialize (apr_pool_t *pool) |
void | cleanup () |
void | rebuildPollset () |
Given the internal state of the chains, rebuild the pollset. | |
void | processChain (LLChainInfo &chain) |
Process the chain passed in. | |
bool | handleChainError (LLChainInfo &chain, LLIOPipe::EStatus error) |
Rewind through the chain to try to recover from an error. | |
Protected Attributes | |
EState | mState |
bool | mRebuildPollset |
apr_pollset_t * | mPollset |
S32 | mPollsetClientID |
S32 | mNextLock |
std::set< S32 > | mClearLocks |
LLRunner | mRunner |
pending_chains_t | mPendingChains |
running_chains_t | mRunningChains |
current_chain_t | mCurrentChain |
callbacks_t | mPendingCallbacks |
callbacks_t | mCallbacks |
apr_pool_t * | mPool |
apr_pool_t * | mCurrentPool |
S32 | mCurrentPoolReallocCount |
int * | mChainsMutex |
int * | mCallbackMutex |
Classes | |
struct | LLChainInfo |
struct | LLLinkInfo |
Struct to associate a pipe with it's buffer io indexes. More... |
The pump class provides a thread abstraction for doing IO based communication between two threads in a structured and optimized for processor time. The primary usage is to create a pump, and call pump()
on a thread used for IO and call respond()
on a thread that is expected to do higher level processing. You can call almost any other method from any thread - see notes for each method for details. In order for the threading abstraction to work, you need to call prime()
with a valid apr pool. A pump instance manages much of the state for the pipe, including the list of pipes in the chain, the channel for each element in the chain, the buffer, and if any pipe has marked the stream or process as done. Pipes can also set file descriptor based conditional statements so that calls to process do not happen until data is ready to be read or written. Pipes control execution of calls to process by returning a status code such as STATUS_OK or STATUS_BREAK. One way to conceptualize the way IO will work is that a pump combines the unit processing of pipes to behave like file pipes on the unix command line.
Definition at line 81 of file llpumpio.h.
typedef std::vector<LLChainInfo> LLPumpIO::callbacks_t [protected] |
Definition at line 381 of file llpumpio.h.
typedef std::vector<LLIOPipe::ptr_t> LLPumpIO::chain_t |
typedef running_chains_t::iterator LLPumpIO::current_chain_t [protected] |
Definition at line 375 of file llpumpio.h.
typedef std::vector<LLLinkInfo> LLPumpIO::links_t |
typedef std::vector<LLChainInfo> LLPumpIO::pending_chains_t [protected] |
Definition at line 370 of file llpumpio.h.
typedef std::list<LLChainInfo> LLPumpIO::running_chains_t [protected] |
Definition at line 372 of file llpumpio.h.
enum LLPumpIO::EControl |
enum LLPumpIO::EState [protected] |
LLPumpIO::LLPumpIO | ( | apr_pool_t * | pool | ) |
Constructor.
Definition at line 117 of file llpumpio.cpp.
References initialize(), and LLMemType::MTYPE_IO_PUMP.
LLPumpIO::~LLPumpIO | ( | ) |
Destructor.
Definition at line 133 of file llpumpio.cpp.
References cleanup(), and LLMemType::MTYPE_IO_PUMP.
bool LLPumpIO::addChain | ( | const links_t & | links, | |
LLIOPipe::buffer_ptr_t | data, | |||
LLSD | context, | |||
F32 | timeout | |||
) |
Add a chain to this pump and process in the next cycle.
This method provides a slightly more sophisticated method for adding a chain where the caller can specify which link elements are on what channels. This method will fail if no buffer is provided since any calls to generate new channels for the buffers will cause unpredictable interleaving of data.
links | The pipes and io indexes for the chain | |
data | Shared pointer to data buffer | |
context | Potentially undefined context meta-data for chain. | |
timeout | The number of seconds in the future to expire. Pass in 0.0f to never expire. |
Definition at line 177 of file llpumpio.cpp.
References lldebugs, llendl, LLPumpIO::LLChainInfo::mChainLinks, mChainsMutex, LLPumpIO::LLChainInfo::mContext, LLPumpIO::LLChainInfo::mData, mPendingChains, LLMemType::MTYPE_IO_PUMP, and LLPumpIO::LLChainInfo::setTimeoutSeconds().
Add a chain to this pump and process in the next cycle.
This method will automatically generate a buffer and assign each link in the chain as if it were the consumer to the previous.
chain | The pipes for the chain | |
timeout | The number of seconds in the future to expire. Pass in 0.0f to never expire. |
Definition at line 147 of file llpumpio.cpp.
References end, lldebugs, llendl, LLPumpIO::LLChainInfo::mChainLinks, mChainsMutex, LLPumpIO::LLLinkInfo::mChannels, LLPumpIO::LLChainInfo::mData, mPendingChains, LLPumpIO::LLLinkInfo::mPipe, LLMemType::MTYPE_IO_PUMP, and LLPumpIO::LLChainInfo::setTimeoutSeconds().
Referenced by LLDeferredChain::addToPump(), LLIOHTTPServer::create(), tut::HTTPServiceTestData::makeRequest(), LLSDRPCClient::process_impl(), LLIOAddChain::process_impl(), LLIOServerSocket::process_impl(), LLHTTPResponder::process_impl(), tut::rpc_server_data::pump_loop(), request(), and LLVoiceClient::stateMachine().
void LLPumpIO::callback | ( | ) |
Run through the callback queue and call process()
.
This call will process all prending responses and call process on each. This method will then drop all processed callback requests which may lead to deleting the referenced objects.
Definition at line 635 of file llpumpio.cpp.
References end, mCallbackMutex, mCallbacks, mPendingCallbacks, LLMemType::MTYPE_IO_PUMP, and processChain().
Referenced by LLMessageSystem::checkAllMessages(), main_loop(), pump_loop(), tut::rpc_server_data::pump_loop(), tut::HTTPServiceTestData::pumpPipe(), and tut::HTTPClientTestData::runThePump().
void LLPumpIO::cleanup | ( | ) | [protected] |
Definition at line 697 of file llpumpio.cpp.
References mCallbackMutex, mChainsMutex, mCurrentPool, mPollset, mPool, LLMemType::MTYPE_IO_PUMP, and NULL.
Referenced by prime(), and ~LLPumpIO().
Clears the identified lock.
links | A container for the links which will be appended |
Definition at line 294 of file llpumpio.cpp.
References mChainsMutex, and mClearLocks.
Referenced by LLSDRPCServer::clearLock(), LLSDRPCServer::process_impl(), LLChainSleeper::run(), and LLHTTPPipe::unlockChain().
Send a command to the pump.
op | What control to send to the pump. |
Definition at line 666 of file llpumpio.cpp.
References mChainsMutex, mState, NORMAL, PAUSE, PAUSING, and RESUME.
bool LLPumpIO::copyCurrentLinkInfo | ( | links_t & | links | ) | const |
Copy the currently running chain link info.
*FIX: Given the structure of the pump and pipe relationship, this should probably go through a different mechanism than the pump. I think it would be best if the pipe had some kind of controller which was passed into process()
rather than the pump which exposed this interface.
links | A container for the links which will be appended |
Definition at line 324 of file llpumpio.cpp.
References mCurrentChain, and LLMemType::MTYPE_IO_PUMP.
Referenced by LLHTTPResponder::process_impl().
bool LLPumpIO::handleChainError | ( | LLChainInfo & | chain, | |
LLIOPipe::EStatus | error | |||
) | [protected] |
Rewind through the chain to try to recover from an error.
This method will potentially modify the internals of the chain.
chain | The LLChainInfo object to work on. |
Definition at line 955 of file llpumpio.cpp.
References LLIOPipe::isSuccess(), lldebugs, llendl, llinfos, LLIOPipe::lookupStatusString(), LLPumpIO::LLChainInfo::mChainLinks, LLPumpIO::LLChainInfo::mHead, LLMemType::MTYPE_IO_PUMP, LLIOPipe::STATUS_BREAK, LLIOPipe::STATUS_DONE, LLIOPipe::STATUS_ERROR, LLIOPipe::STATUS_NEED_PROCESS, LLIOPipe::STATUS_OK, and LLIOPipe::STATUS_STOP.
Referenced by processChain(), and pump().
void LLPumpIO::initialize | ( | apr_pool_t * | pool | ) | [protected] |
Definition at line 685 of file llpumpio.cpp.
References mCallbackMutex, mChainsMutex, mPool, and LLMemType::MTYPE_IO_PUMP.
Referenced by LLPumpIO(), and prime().
bool LLPumpIO::prime | ( | apr_pool_t * | pool | ) |
Prepare this pump for usage.
If you fail to call this method prior to use, the pump will try to work, but will not come with any thread locking mechanisms.
pool | The apr pool to use. |
Definition at line 139 of file llpumpio.cpp.
References cleanup(), initialize(), LLMemType::MTYPE_IO_PUMP, and NULL.
void LLPumpIO::processChain | ( | LLChainInfo & | chain | ) | [protected] |
Process the chain passed in.
This method will potentially modify the internals of the chain. On end, the chain.mHead will equal chain.mChainLinks.end().
chain | The LLChainInfo object to work on. |
Definition at line 772 of file llpumpio.cpp.
References end, handleChainError(), LLIOPipe::isError(), LLIOPipe::isSuccess(), llendl, llinfos, LLIOPipe::lookupStatusString(), LLPumpIO::LLChainInfo::mChainLinks, LLPumpIO::LLChainInfo::mContext, LLPumpIO::LLChainInfo::mData, LLPumpIO::LLChainInfo::mEOS, LLPumpIO::LLChainInfo::mHead, LLMemType::MTYPE_IO_PUMP, NULL, PUMP_DEBUG, S32, LLIOPipe::STATUS_BREAK, LLIOPipe::STATUS_DONE, LLIOPipe::STATUS_NEED_PROCESS, LLIOPipe::STATUS_OK, and LLIOPipe::STATUS_STOP.
Referenced by callback(), and pump().
void LLPumpIO::pump | ( | ) |
Call this method to call process on all running chains.
This method iterates through the running chains, and if all pipe on a chain are unconditionally ready or if any pipe has any conditional processiong condition then process will be called on every chain which has requested processing. that chain has a file descriptor ready, process()
will be called for all pipes which have requested it.
Definition at line 344 of file llpumpio.cpp.
References count, DEFAULT_CHAIN_EXPIRY_SECS, end, END_PUMP_DEBUG, LLFastTimer::FTM_PUMP, handleChainError(), i, lldebugs, llendl, llinfos, mChainsMutex, mClearLocks, mCurrentChain, mPendingChains, mPollset, mRebuildPollset, mRunner, mRunningChains, mState, LLMemType::MTYPE_IO_PUMP, NULL, PAUSED, PAUSING, processChain(), PUMP_DEBUG, rebuildPollset(), LLRunner::run(), S32, and LLIOPipe::STATUS_EXPIRED.
Referenced by LLMessageSystem::checkAllMessages(), main_loop(), pump_loop(), tut::rpc_server_data::pump_loop(), tut::HTTPServiceTestData::pumpPipe(), and tut::HTTPClientTestData::runThePump().
void LLPumpIO::rebuildPollset | ( | ) | [protected] |
Given the internal state of the chains, rebuild the pollset.
Definition at line 720 of file llpumpio.cpp.
References ll_apr_warn_status(), mCurrentPool, mCurrentPoolReallocCount, mPollset, mPool, mRunningChains, LLMemType::MTYPE_IO_PUMP, NULL, S32, size, and void.
Referenced by pump().
bool LLPumpIO::respond | ( | const links_t & | links, | |
LLIOPipe::buffer_ptr_t | data, | |||
LLSD | context | |||
) |
Add a chain to a special queue which will be called during the next call to callback()
and then dropped from the queue.
It is important to remember that you should not add a data buffer or context which may still be in another chain - that will almost certainly lead to a problems. Ensure that you are done reading and writing to those parameters, have new generated, or empty pointers.
links | The pipes and io indexes for the chain | |
data | Shared pointer to data buffer | |
context | Potentially undefined context meta-data for chain. |
Definition at line 611 of file llpumpio.cpp.
References mCallbackMutex, LLPumpIO::LLChainInfo::mChainLinks, LLPumpIO::LLChainInfo::mContext, LLPumpIO::LLChainInfo::mData, mPendingCallbacks, and LLMemType::MTYPE_IO_PUMP.
bool LLPumpIO::respond | ( | LLIOPipe * | pipe | ) |
Add a chain to a special queue which will be called during the next call to callback()
and then dropped from the queue.Add pipe to a special queue which will be called during the next call to callback()
and then dropped from the queue.
This call will add a single pipe, with no buffer, context, or channel information to the callback queue. It will be called once, and then dropped.
pipe | A single io pipe which will be called |
Definition at line 595 of file llpumpio.cpp.
References mCallbackMutex, LLPumpIO::LLChainInfo::mChainLinks, mPendingCallbacks, LLPumpIO::LLLinkInfo::mPipe, LLMemType::MTYPE_IO_PUMP, and NULL.
Referenced by LLURLRequest::process_impl(), LLSDRPCServer::process_impl(), and LLSDRPCClient::process_impl().
bool LLPumpIO::setConditional | ( | LLIOPipe * | pipe, | |
const apr_pollfd_t * | poll | |||
) |
Set up file descriptors for for the running chain.
process()
rather than the pump which exposed this interface. pipe | The pipe which is setting a conditional | |
poll | The entire socket and read/write condition - null to remove |
Definition at line 220 of file llpumpio.cpp.
References mPollsetClientID, mPool, mRebuildPollset, LLMemType::MTYPE_IO_PUMP, and S32.
Referenced by LLIOServerSocket::process_impl(), LLIOSocketWriter::process_impl(), and LLIOSocketReader::process_impl().
S32 LLPumpIO::setLock | ( | ) |
Lock the current chain.
process()
until you call clearLock()
with the lock identifier. *FIX: Given the structure of the pump and pipe relationship, this should probably go through a different mechanism than the pump. I think it would be best if the pipe had some kind of controller which was passed into process()
rather than the pump which exposed this interface. clearLock()
or 0 on failure. Definition at line 266 of file llpumpio.cpp.
References mCurrentChain, and mNextLock.
Referenced by LLHTTPPipe::lockChain(), LLSDRPCServer::process_impl(), and sleepChain().
bool LLPumpIO::setTimeoutSeconds | ( | F32 | timeout | ) |
Set or clear a timeout for the running chain.
timeout | The number of seconds in the future to expire. Pass in 0.0f to never expire. |
Definition at line 209 of file llpumpio.cpp.
References mCurrentChain.
bool LLPumpIO::sleepChain | ( | F64 | seconds | ) |
Stop processing a chain for a while.
seconds | The number of seconds in the future to resume processing. |
Definition at line 308 of file llpumpio.cpp.
References LLRunner::addRunnable(), LLChainSleeper::build(), mRunner, LLRunner::RUN_IN, S32, and setLock().
Referenced by LLIOSleep::process_impl().
int* LLPumpIO::mCallbackMutex [protected] |
Definition at line 395 of file llpumpio.h.
Referenced by callback(), cleanup(), initialize(), and respond().
callbacks_t LLPumpIO::mCallbacks [protected] |
int* LLPumpIO::mChainsMutex [protected] |
Definition at line 394 of file llpumpio.h.
Referenced by addChain(), cleanup(), clearLock(), control(), initialize(), and pump().
std::set<S32> LLPumpIO::mClearLocks [protected] |
current_chain_t LLPumpIO::mCurrentChain [protected] |
Definition at line 376 of file llpumpio.h.
Referenced by copyCurrentLinkInfo(), pump(), setLock(), and setTimeoutSeconds().
apr_pool_t* LLPumpIO::mCurrentPool [protected] |
S32 LLPumpIO::mCurrentPoolReallocCount [protected] |
S32 LLPumpIO::mNextLock [protected] |
callbacks_t LLPumpIO::mPendingCallbacks [protected] |
pending_chains_t LLPumpIO::mPendingChains [protected] |
apr_pollset_t* LLPumpIO::mPollset [protected] |
S32 LLPumpIO::mPollsetClientID [protected] |
apr_pool_t* LLPumpIO::mPool [protected] |
Definition at line 386 of file llpumpio.h.
Referenced by cleanup(), initialize(), rebuildPollset(), and setConditional().
bool LLPumpIO::mRebuildPollset [protected] |
LLRunner LLPumpIO::mRunner [protected] |
running_chains_t LLPumpIO::mRunningChains [protected] |
EState LLPumpIO::mState [protected] |