lliosocket.cpp

Go to the documentation of this file.
00001 
00034 #include "linden_common.h"
00035 #include "lliosocket.h"
00036 
00037 #include "llapr.h"
00038 
00039 #include "llbuffer.h"
00040 #include "llhost.h"
00041 #include "llmemtype.h"
00042 #include "llpumpio.h"
00043 
00044 //
00045 // constants
00046 //
00047 
00048 static const S32 LL_DEFAULT_LISTEN_BACKLOG = 10;
00049 static const S32 LL_SEND_BUFFER_SIZE = 40000;
00050 static const S32 LL_RECV_BUFFER_SIZE = 40000;
00051 //static const U16 LL_PORT_DISCOVERY_RANGE_MIN = 13000;
00052 //static const U16 LL_PORT_DISCOVERY_RANGE_MAX = 13050;
00053 
00054 //
00055 // local methods 
00056 //
00057 
00058 bool is_addr_in_use(apr_status_t status)
00059 {
00060 #if LL_WINDOWS
00061         return (WSAEADDRINUSE == APR_TO_OS_ERROR(status));
00062 #else
00063         return (EADDRINUSE == APR_TO_OS_ERROR(status));
00064 #endif
00065 }
00066 
00070 
00071 // static
00072 LLSocket::ptr_t LLSocket::create(apr_pool_t* pool, EType type, U16 port)
00073 {
00074         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00075         LLSocket::ptr_t rv;
00076         apr_socket_t* socket = NULL;
00077         apr_pool_t* new_pool = NULL;
00078         apr_status_t status = APR_EGENERAL;
00079 
00080         // create a pool for the socket
00081         status = apr_pool_create(&new_pool, pool);
00082         if(ll_apr_warn_status(status))
00083         {
00084                 if(new_pool) apr_pool_destroy(new_pool);
00085                 return rv;
00086         }
00087 
00088         if(STREAM_TCP == type)
00089         {
00090                 status = apr_socket_create(
00091                         &socket,
00092                         APR_INET,
00093                         SOCK_STREAM,
00094                         APR_PROTO_TCP,
00095                         new_pool);
00096         }
00097         else if(DATAGRAM_UDP == type)
00098         {
00099                 status = apr_socket_create(
00100                         &socket,
00101                         APR_INET,
00102                         SOCK_DGRAM,
00103                         APR_PROTO_UDP,
00104                         new_pool);
00105         }
00106         else
00107         {
00108                 if(new_pool) apr_pool_destroy(new_pool);
00109                 return rv;
00110         }
00111         if(ll_apr_warn_status(status))
00112         {
00113                 if(new_pool) apr_pool_destroy(new_pool);
00114                 return rv;
00115         }
00116         rv = ptr_t(new LLSocket(socket, new_pool));
00117         if(port > 0)
00118         {
00119                 apr_sockaddr_t* sa = NULL;
00120                 status = apr_sockaddr_info_get(
00121                         &sa,
00122                         APR_ANYADDR,
00123                         APR_UNSPEC,
00124                         port,
00125                         0,
00126                         new_pool);
00127                 if(ll_apr_warn_status(status))
00128                 {
00129                         rv.reset();
00130                         return rv;
00131                 }
00132                 // This allows us to reuse the address on quick down/up. This
00133                 // is unlikely to create problems.
00134                 ll_apr_warn_status(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
00135                 status = apr_socket_bind(socket, sa);
00136                 if(ll_apr_warn_status(status))
00137                 {
00138                         rv.reset();
00139                         return rv;
00140                 }
00141                 lldebugs << "Bound " << ((DATAGRAM_UDP == type) ? "udp" : "tcp")
00142                                  << " socket to port: " << sa->port << llendl;
00143                 if(STREAM_TCP == type)
00144                 {
00145                         // If it's a stream based socket, we need to tell the OS
00146                         // to keep a queue of incoming connections for ACCEPT.
00147                         lldebugs << "Setting listen state for socket." << llendl;
00148                         status = apr_socket_listen(
00149                                 socket,
00150                                 LL_DEFAULT_LISTEN_BACKLOG);
00151                         if(ll_apr_warn_status(status))
00152                         {
00153                                 rv.reset();
00154                                 return rv;
00155                         }
00156                 }
00157         }
00158         else
00159         {
00160                 // we need to indicate that we have an ephemeral port if the
00161                 // previous calls were successful. It will
00162                 port = PORT_EPHEMERAL;
00163         }
00164         rv->mPort = port;
00165         rv->setOptions();
00166         return rv;
00167 }
00168 
00169 // static
00170 LLSocket::ptr_t LLSocket::create(apr_socket_t* socket, apr_pool_t* pool)
00171 {
00172         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00173         LLSocket::ptr_t rv;
00174         if(!socket)
00175         {
00176                 return rv;
00177         }
00178         rv = ptr_t(new LLSocket(socket, pool));
00179         rv->mPort = PORT_EPHEMERAL;
00180         rv->setOptions();
00181         return rv;
00182 }
00183 
00184 
00185 bool LLSocket::blockingConnect(const LLHost& host)
00186 {
00187         if(!mSocket) return false;
00188         apr_sockaddr_t* sa = NULL;
00189         char ip_address[MAXADDRSTR];    /*Flawfinder: ignore*/
00190         host.getIPString(ip_address, MAXADDRSTR);
00191         if(ll_apr_warn_status(apr_sockaddr_info_get(
00192                 &sa,
00193                 ip_address,
00194                 APR_UNSPEC,
00195                 host.getPort(),
00196                 0,
00197                 mPool)))
00198         {
00199                 return false;
00200         }
00201         apr_socket_timeout_set(mSocket, 1000);
00202         if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false;
00203         setOptions();
00204         return true;
00205 }
00206 
00207 LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) :
00208         mSocket(socket),
00209         mPool(pool),
00210         mPort(PORT_INVALID)
00211 {
00212         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00213 }
00214 
00215 LLSocket::~LLSocket()
00216 {
00217         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00218         // *FIX: clean up memory we are holding.
00219         //lldebugs << "Destroying LLSocket" << llendl;
00220         if(mSocket)
00221         {
00222                 apr_socket_close(mSocket);
00223         }
00224         if(mPool)
00225         {
00226                 apr_pool_destroy(mPool);
00227         }
00228 }
00229 
00230 void LLSocket::setOptions()
00231 {
00232         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00233         // set up the socket options
00234         ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
00235         ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
00236         ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
00237 
00238 }
00239 
00243 
00244 LLIOSocketReader::LLIOSocketReader(LLSocket::ptr_t socket) :
00245         mSource(socket),
00246         mInitialized(false)
00247 {
00248         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00249 }
00250 
00251 LLIOSocketReader::~LLIOSocketReader()
00252 {
00253         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00254         //lldebugs << "Destroying LLIOSocketReader" << llendl;
00255 }
00256 
00257 // virtual
00258 LLIOPipe::EStatus LLIOSocketReader::process_impl(
00259         const LLChannelDescriptors& channels,
00260         buffer_ptr_t& buffer,
00261         bool& eos,
00262         LLSD& context,
00263         LLPumpIO* pump)
00264 {
00265         PUMP_DEBUG;
00266         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00267         if(!mSource) return STATUS_PRECONDITION_NOT_MET;
00268         if(!mInitialized)
00269         {
00270                 PUMP_DEBUG;
00271                 // Since the read will not block, it's ok to initialize and
00272                 // attempt to read off the descriptor immediately.
00273                 mInitialized = true;
00274                 if(pump)
00275                 {
00276                         PUMP_DEBUG;
00277                         lldebugs << "Initializing poll descriptor for LLIOSocketReader."
00278                                          << llendl;
00279                         apr_pollfd_t poll_fd;
00280                         poll_fd.p = NULL;
00281                         poll_fd.desc_type = APR_POLL_SOCKET;
00282                         poll_fd.reqevents = APR_POLLIN;
00283                         poll_fd.rtnevents = 0x0;
00284                         poll_fd.desc.s = mSource->getSocket();
00285                         poll_fd.client_data = NULL;
00286                         pump->setConditional(this, &poll_fd);
00287                 }
00288         }
00289         //if(!buffer)
00290         //{
00291         //      buffer = new LLBufferArray;
00292         //}
00293         PUMP_DEBUG;
00294         const apr_size_t READ_BUFFER_SIZE = 1024;
00295         char read_buf[READ_BUFFER_SIZE]; /*Flawfinder: ignore*/
00296         apr_size_t len;
00297         apr_status_t status = APR_SUCCESS;
00298         do
00299         {
00300                 PUMP_DEBUG;
00301                 len = READ_BUFFER_SIZE;
00302                 status = apr_socket_recv(mSource->getSocket(), read_buf, &len);
00303                 buffer->append(channels.out(), (U8*)read_buf, len);
00304         } while((APR_SUCCESS == status) && (READ_BUFFER_SIZE == len));
00305         lldebugs << "socket read status: " << status << llendl;
00306         LLIOPipe::EStatus rv = STATUS_OK;
00307 
00308         PUMP_DEBUG;
00309         // *FIX: Also need to check for broken pipe
00310         if(APR_STATUS_IS_EOF(status))
00311         {
00312                 // *FIX: Should we shut down the socket read?
00313                 if(pump)
00314                 {
00315                         pump->setConditional(this, NULL);
00316                 }
00317                 rv = STATUS_DONE;
00318                 eos = true;
00319         }
00320         else if(APR_STATUS_IS_EAGAIN(status))
00321         {
00322                 // everything is fine, but we can terminate this process pump.
00323                 rv = STATUS_BREAK;
00324         }
00325         else
00326         {
00327                 if(ll_apr_warn_status(status))
00328                 {
00329                         rv = STATUS_ERROR;
00330                 }
00331         }
00332         PUMP_DEBUG;
00333         return rv;
00334 }
00335 
00339 
00340 LLIOSocketWriter::LLIOSocketWriter(LLSocket::ptr_t socket) :
00341         mDestination(socket),
00342         mLastWritten(NULL),
00343         mInitialized(false)
00344 {
00345         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00346 }
00347 
00348 LLIOSocketWriter::~LLIOSocketWriter()
00349 {
00350         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00351         //lldebugs << "Destroying LLIOSocketWriter" << llendl;
00352 }
00353 
00354 // virtual
00355 LLIOPipe::EStatus LLIOSocketWriter::process_impl(
00356         const LLChannelDescriptors& channels,
00357         buffer_ptr_t& buffer,
00358         bool& eos,
00359         LLSD& context,
00360         LLPumpIO* pump)
00361 {
00362         PUMP_DEBUG;
00363         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00364         if(!mDestination) return STATUS_PRECONDITION_NOT_MET;
00365         if(!mInitialized)
00366         {
00367                 PUMP_DEBUG;
00368                 // Since the write will not block, it's ok to initialize and
00369                 // attempt to write immediately.
00370                 mInitialized = true;
00371                 if(pump)
00372                 {
00373                         PUMP_DEBUG;
00374                         lldebugs << "Initializing poll descriptor for LLIOSocketWriter."
00375                                          << llendl;
00376                         apr_pollfd_t poll_fd;
00377                         poll_fd.p = NULL;
00378                         poll_fd.desc_type = APR_POLL_SOCKET;
00379                         poll_fd.reqevents = APR_POLLOUT;
00380                         poll_fd.rtnevents = 0x0;
00381                         poll_fd.desc.s = mDestination->getSocket();
00382                         poll_fd.client_data = NULL;
00383                         pump->setConditional(this, &poll_fd);
00384                 }
00385         }
00386 
00387         PUMP_DEBUG;
00388         // *FIX: Some sort of writev implementation would be much more
00389         // efficient - not only because writev() is better, but also
00390         // because we won't have to do as much work to find the start
00391         // address.
00392         LLBufferArray::segment_iterator_t it;
00393         LLBufferArray::segment_iterator_t end = buffer->endSegment();
00394         LLSegment segment;
00395         it = buffer->constructSegmentAfter(mLastWritten, segment);
00396         /*
00397         if(NULL == mLastWritten)
00398         {
00399                 it = buffer->beginSegment();
00400                 segment = (*it);
00401         }
00402         else
00403         {
00404                 it = buffer->getSegment(mLastWritten);
00405                 segment = (*it);
00406                 S32 size = segment.size();
00407                 U8* data = segment.data();
00408                 if((data + size) == mLastWritten)
00409                 {
00410                         ++it;
00411                         segment = (*it);
00412                 }
00413                 else
00414                 {
00415                         // *FIX: check the math on this one
00416                         segment = LLSegment(
00417                                 (*it).getChannelMask(),
00418                                 mLastWritten + 1,
00419                                 size - (mLastWritten - data));
00420                 }
00421         }
00422         */
00423 
00424         PUMP_DEBUG;
00425         apr_size_t len;
00426         bool done = false;
00427         apr_status_t status = APR_SUCCESS;
00428         while(it != end)
00429         {
00430 
00431                 PUMP_DEBUG;
00432                 if((*it).isOnChannel(channels.in()))
00433                 {
00434                         PUMP_DEBUG;
00435                         len = (apr_size_t)segment.size();
00436                         status = apr_socket_send(
00437                                 mDestination->getSocket(),
00438                                 (const char*)segment.data(),
00439                                 &len);
00440                         // We sometimes get a 'non-blocking socket operation could not be 
00441                         // completed immediately' error from apr_socket_send.  In this
00442                         // case we break and the data will be sent the next time the chain
00443                         // is pumped.
00444                         if(APR_STATUS_IS_EAGAIN(status))
00445                         {
00446                                 ll_apr_warn_status(status);
00447                                 break;
00448                         }
00449 
00450                         mLastWritten = segment.data() + len - 1;
00451 
00452                         PUMP_DEBUG;
00453                         if((S32)len < segment.size())
00454                         {
00455                                 break;
00456                         }
00457                         
00458                 }
00459 
00460                 ++it;
00461                 if(it != end)
00462                 {
00463                         segment = (*it);
00464                 }
00465                 else
00466                 {
00467                         done = true;
00468                 }
00469 
00470         }
00471         PUMP_DEBUG;
00472         if(done && eos)
00473         {
00474                 return STATUS_DONE;
00475         }
00476         return STATUS_OK;
00477 }
00478 
00479 
00483 
00484 LLIOServerSocket::LLIOServerSocket(
00485         apr_pool_t* pool,
00486         LLIOServerSocket::socket_t listener,
00487         factory_t factory) :
00488         mPool(pool),
00489         mListenSocket(listener),
00490         mReactor(factory),
00491         mInitialized(false),
00492         mResponseTimeout(DEFAULT_CHAIN_EXPIRY_SECS)
00493 {
00494         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00495 }
00496 
00497 LLIOServerSocket::~LLIOServerSocket()
00498 {
00499         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00500         //lldebugs << "Destroying LLIOServerSocket" << llendl;
00501 }
00502 
00503 void LLIOServerSocket::setResponseTimeout(F32 timeout_secs)
00504 {
00505         mResponseTimeout = timeout_secs;
00506 }
00507 
00508 // virtual
00509 LLIOPipe::EStatus LLIOServerSocket::process_impl(
00510         const LLChannelDescriptors& channels,
00511         buffer_ptr_t& buffer,
00512         bool& eos,
00513         LLSD& context,
00514         LLPumpIO* pump)
00515 {
00516         PUMP_DEBUG;
00517         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00518         if(!pump)
00519         {
00520                 llwarns << "Need a pump for server socket." << llendl;
00521                 return STATUS_ERROR;
00522         }
00523         if(!mInitialized)
00524         {
00525                 PUMP_DEBUG;
00526                 // This segment sets up the pump so that we do not call
00527                 // process again until we have an incoming read, aka connect()
00528                 // from a remote host.
00529                 lldebugs << "Initializing poll descriptor for LLIOServerSocket."
00530                                  << llendl;
00531                 apr_pollfd_t poll_fd;
00532                 poll_fd.p = NULL;
00533                 poll_fd.desc_type = APR_POLL_SOCKET;
00534                 poll_fd.reqevents = APR_POLLIN;
00535                 poll_fd.rtnevents = 0x0;
00536                 poll_fd.desc.s = mListenSocket->getSocket();
00537                 poll_fd.client_data = NULL;
00538                 pump->setConditional(this, &poll_fd);
00539                 mInitialized = true;
00540                 return STATUS_OK;
00541         }
00542 
00543         // we are initialized, and told to process, so we must have a
00544         // socket waiting for a connection.
00545         lldebugs << "accepting socket" << llendl;
00546 
00547         PUMP_DEBUG;
00548         apr_pool_t* new_pool = NULL;
00549         apr_status_t status = apr_pool_create(&new_pool, mPool);
00550         apr_socket_t* socket = NULL;
00551         status = apr_socket_accept(
00552                 &socket,
00553                 mListenSocket->getSocket(),
00554                 new_pool);
00555         LLSocket::ptr_t llsocket(LLSocket::create(socket, new_pool));
00556         //EStatus rv = STATUS_ERROR;
00557         if(llsocket)
00558         {
00559                 PUMP_DEBUG;
00560 
00561                 apr_sockaddr_t* remote_addr;
00562                 apr_socket_addr_get(&remote_addr, APR_REMOTE, socket);
00563                 
00564                 char* remote_host_string;
00565                 apr_sockaddr_ip_get(&remote_host_string, remote_addr);
00566 
00567                 LLSD context;
00568                 context["remote-host"] = remote_host_string;
00569                 context["remote-port"] = remote_addr->port;
00570 
00571                 LLPumpIO::chain_t chain;
00572                 chain.push_back(LLIOPipe::ptr_t(new LLIOSocketReader(llsocket)));
00573                 if(mReactor->build(chain, context))
00574                 {
00575                         chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(llsocket)));
00576                         pump->addChain(chain, mResponseTimeout);
00577                         status = STATUS_OK;
00578                 }
00579                 else
00580                 {
00581                         llwarns << "Unable to build reactor to socket." << llendl;
00582                 }
00583         }
00584         else
00585         {
00586                 llwarns << "Unable to create linden socket." << llendl;
00587         }
00588 
00589         PUMP_DEBUG;
00590         // This needs to always return success, lest it get removed from
00591         // the pump.
00592         return STATUS_OK;
00593 }
00594 
00595 
00596 #if 0
00597 LLIODataSocket::LLIODataSocket(
00598         U16 suggested_port,
00599         U16 start_discovery_port,
00600         apr_pool_t* pool) : 
00601         mSocket(NULL)
00602 {
00603         if(!pool || (PORT_INVALID == suggested_port)) return;
00604         if(ll_apr_warn_status(apr_socket_create(&mSocket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, pool))) return;
00605         apr_sockaddr_t* sa = NULL;
00606         if(ll_apr_warn_status(apr_sockaddr_info_get(&sa, APR_ANYADDR, APR_UNSPEC, suggested_port, 0, pool))) return;
00607         apr_status_t status = apr_socket_bind(mSocket, sa);
00608         if((start_discovery_port > 0) && is_addr_in_use(status))
00609         {
00610                 const U16 MAX_ATTEMPT_PORTS = 50;
00611                 for(U16 attempt_port = start_discovery_port;
00612                         attempt_port < (start_discovery_port + MAX_ATTEMPT_PORTS);
00613                         ++attempt_port)
00614                 {
00615                         sa->port = attempt_port;
00616                         sa->sa.sin.sin_port = htons(attempt_port);
00617                         status = apr_socket_bind(mSocket, sa);
00618                         if(APR_SUCCESS == status) break;
00619                         if(is_addr_in_use(status)) continue;
00620                         (void)ll_apr_warn_status(status);
00621                 }
00622         }
00623         if(ll_apr_warn_status(status)) return;
00624         if(sa->port)
00625         {
00626                 lldebugs << "Bound datagram socket to port: " << sa->port << llendl;
00627                 mPort = sa->port;
00628         }
00629         else
00630         {
00631                 mPort = LLIOSocket::PORT_EPHEMERAL;
00632         }
00633 
00634         // set up the socket options options
00635         ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
00636         ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
00637         ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
00638 }
00639 
00640 LLIODataSocket::~LLIODataSocket()
00641 {
00642 }
00643 
00644 
00645 #endif

Generated on Thu Jul 1 06:08:47 2010 for Second Life Viewer by  doxygen 1.4.7