lliosocket.cpp

Go to the documentation of this file.
00001 
00034 #include "linden_common.h"
00035 #include "lliosocket.h"
00036 
00037 #include "llapr.h"
00038 
00039 #include "llbuffer.h"
00040 #include "llhost.h"
00041 #include "llmemtype.h"
00042 #include "llpumpio.h"
00043 
00044 //
00045 // constants
00046 //
00047 
00048 static const S32 LL_DEFAULT_LISTEN_BACKLOG = 10;
00049 static const S32 LL_SEND_BUFFER_SIZE = 40000;
00050 static const S32 LL_RECV_BUFFER_SIZE = 40000;
00051 //static const U16 LL_PORT_DISCOVERY_RANGE_MIN = 13000;
00052 //static const U16 LL_PORT_DISCOVERY_RANGE_MAX = 13050;
00053 
00054 //
00055 // local methods 
00056 //
00057 
00058 bool is_addr_in_use(apr_status_t status)
00059 {
00060 #if LL_WINDOWS
00061         return (WSAEADDRINUSE == APR_TO_OS_ERROR(status));
00062 #else
00063         return (EADDRINUSE == APR_TO_OS_ERROR(status));
00064 #endif
00065 }
00066 
00067 #if LL_LINUX
00068 // Define this to see the actual file descriptors being tossed around.
00069 //#define LL_DEBUG_SOCKET_FILE_DESCRIPTORS 1
00070 #if LL_DEBUG_SOCKET_FILE_DESCRIPTORS
00071 #include "apr-1/apr_portable.h"
00072 #endif
00073 #endif
00074 
00075 
00076 // Quick function 
00077 void ll_debug_socket(const char* msg, apr_socket_t* apr_sock)
00078 {
00079 #if LL_DEBUG_SOCKET_FILE_DESCRIPTORS
00080         if(!apr_sock)
00081         {
00082                 lldebugs << "Socket -- " << (msg?msg:"") << ": no socket." << llendl;
00083                 return;
00084         }
00085         // *TODO: Why doesn't this work?
00086         //apr_os_sock_t os_sock;
00087         int os_sock;
00088         if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock))
00089         {
00090                 lldebugs << "Socket -- " << (msg?msg:"") << " on fd " << os_sock
00091                         << " at " << apr_sock << llendl;
00092         }
00093         else
00094         {
00095                 lldebugs << "Socket -- " << (msg?msg:"") << " no fd "
00096                         << " at " << apr_sock << llendl;
00097         }
00098 #endif
00099 }
00100 
00104 
00105 // static
00106 LLSocket::ptr_t LLSocket::create(apr_pool_t* pool, EType type, U16 port)
00107 {
00108         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00109         LLSocket::ptr_t rv;
00110         apr_socket_t* socket = NULL;
00111         apr_pool_t* new_pool = NULL;
00112         apr_status_t status = APR_EGENERAL;
00113 
00114         // create a pool for the socket
00115         status = apr_pool_create(&new_pool, pool);
00116         if(ll_apr_warn_status(status))
00117         {
00118                 if(new_pool) apr_pool_destroy(new_pool);
00119                 return rv;
00120         }
00121 
00122         if(STREAM_TCP == type)
00123         {
00124                 status = apr_socket_create(
00125                         &socket,
00126                         APR_INET,
00127                         SOCK_STREAM,
00128                         APR_PROTO_TCP,
00129                         new_pool);
00130         }
00131         else if(DATAGRAM_UDP == type)
00132         {
00133                 status = apr_socket_create(
00134                         &socket,
00135                         APR_INET,
00136                         SOCK_DGRAM,
00137                         APR_PROTO_UDP,
00138                         new_pool);
00139         }
00140         else
00141         {
00142                 if(new_pool) apr_pool_destroy(new_pool);
00143                 return rv;
00144         }
00145         if(ll_apr_warn_status(status))
00146         {
00147                 if(new_pool) apr_pool_destroy(new_pool);
00148                 return rv;
00149         }
00150         rv = ptr_t(new LLSocket(socket, new_pool));
00151         if(port > 0)
00152         {
00153                 apr_sockaddr_t* sa = NULL;
00154                 status = apr_sockaddr_info_get(
00155                         &sa,
00156                         APR_ANYADDR,
00157                         APR_UNSPEC,
00158                         port,
00159                         0,
00160                         new_pool);
00161                 if(ll_apr_warn_status(status))
00162                 {
00163                         rv.reset();
00164                         return rv;
00165                 }
00166                 // This allows us to reuse the address on quick down/up. This
00167                 // is unlikely to create problems.
00168                 ll_apr_warn_status(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
00169                 status = apr_socket_bind(socket, sa);
00170                 if(ll_apr_warn_status(status))
00171                 {
00172                         rv.reset();
00173                         return rv;
00174                 }
00175                 lldebugs << "Bound " << ((DATAGRAM_UDP == type) ? "udp" : "tcp")
00176                                  << " socket to port: " << sa->port << llendl;
00177                 if(STREAM_TCP == type)
00178                 {
00179                         // If it's a stream based socket, we need to tell the OS
00180                         // to keep a queue of incoming connections for ACCEPT.
00181                         lldebugs << "Setting listen state for socket." << llendl;
00182                         status = apr_socket_listen(
00183                                 socket,
00184                                 LL_DEFAULT_LISTEN_BACKLOG);
00185                         if(ll_apr_warn_status(status))
00186                         {
00187                                 rv.reset();
00188                                 return rv;
00189                         }
00190                 }
00191         }
00192         else
00193         {
00194                 // we need to indicate that we have an ephemeral port if the
00195                 // previous calls were successful. It will
00196                 port = PORT_EPHEMERAL;
00197         }
00198         rv->mPort = port;
00199         rv->setOptions();
00200         return rv;
00201 }
00202 
00203 // static
00204 LLSocket::ptr_t LLSocket::create(apr_socket_t* socket, apr_pool_t* pool)
00205 {
00206         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00207         LLSocket::ptr_t rv;
00208         if(!socket)
00209         {
00210                 return rv;
00211         }
00212         rv = ptr_t(new LLSocket(socket, pool));
00213         rv->mPort = PORT_EPHEMERAL;
00214         rv->setOptions();
00215         return rv;
00216 }
00217 
00218 
00219 bool LLSocket::blockingConnect(const LLHost& host)
00220 {
00221         if(!mSocket) return false;
00222         apr_sockaddr_t* sa = NULL;
00223         char ip_address[MAXADDRSTR];    /*Flawfinder: ignore*/
00224         host.getIPString(ip_address, MAXADDRSTR);
00225         if(ll_apr_warn_status(apr_sockaddr_info_get(
00226                 &sa,
00227                 ip_address,
00228                 APR_UNSPEC,
00229                 host.getPort(),
00230                 0,
00231                 mPool)))
00232         {
00233                 return false;
00234         }
00235         apr_socket_timeout_set(mSocket, 1000);
00236         ll_debug_socket("Blocking connect", mSocket);
00237         if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false;
00238         setOptions();
00239         return true;
00240 }
00241 
00242 LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) :
00243         mSocket(socket),
00244         mPool(pool),
00245         mPort(PORT_INVALID)
00246 {
00247         ll_debug_socket("Constructing wholely formed socket", mSocket);
00248         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00249 }
00250 
00251 LLSocket::~LLSocket()
00252 {
00253         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00254         // *FIX: clean up memory we are holding.
00255         if(mSocket)
00256         {
00257                 ll_debug_socket("Destroying socket", mSocket);
00258                 apr_socket_close(mSocket);
00259         }
00260         if(mPool)
00261         {
00262                 apr_pool_destroy(mPool);
00263         }
00264 }
00265 
00266 void LLSocket::setOptions()
00267 {
00268         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00269         // set up the socket options
00270         ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
00271         ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
00272         ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
00273 
00274 }
00275 
00279 
00280 LLIOSocketReader::LLIOSocketReader(LLSocket::ptr_t socket) :
00281         mSource(socket),
00282         mInitialized(false)
00283 {
00284         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00285 }
00286 
00287 LLIOSocketReader::~LLIOSocketReader()
00288 {
00289         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00290         //lldebugs << "Destroying LLIOSocketReader" << llendl;
00291 }
00292 
00293 // virtual
00294 LLIOPipe::EStatus LLIOSocketReader::process_impl(
00295         const LLChannelDescriptors& channels,
00296         buffer_ptr_t& buffer,
00297         bool& eos,
00298         LLSD& context,
00299         LLPumpIO* pump)
00300 {
00301         PUMP_DEBUG;
00302         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00303         if(!mSource) return STATUS_PRECONDITION_NOT_MET;
00304         if(!mInitialized)
00305         {
00306                 PUMP_DEBUG;
00307                 // Since the read will not block, it's ok to initialize and
00308                 // attempt to read off the descriptor immediately.
00309                 mInitialized = true;
00310                 if(pump)
00311                 {
00312                         PUMP_DEBUG;
00313                         lldebugs << "Initializing poll descriptor for LLIOSocketReader."
00314                                          << llendl;
00315                         apr_pollfd_t poll_fd;
00316                         poll_fd.p = NULL;
00317                         poll_fd.desc_type = APR_POLL_SOCKET;
00318                         poll_fd.reqevents = APR_POLLIN;
00319                         poll_fd.rtnevents = 0x0;
00320                         poll_fd.desc.s = mSource->getSocket();
00321                         poll_fd.client_data = NULL;
00322                         pump->setConditional(this, &poll_fd);
00323                 }
00324         }
00325         //if(!buffer)
00326         //{
00327         //      buffer = new LLBufferArray;
00328         //}
00329         PUMP_DEBUG;
00330         const apr_size_t READ_BUFFER_SIZE = 1024;
00331         char read_buf[READ_BUFFER_SIZE]; /*Flawfinder: ignore*/
00332         apr_size_t len;
00333         apr_status_t status = APR_SUCCESS;
00334         do
00335         {
00336                 PUMP_DEBUG;
00337                 len = READ_BUFFER_SIZE;
00338                 status = apr_socket_recv(mSource->getSocket(), read_buf, &len);
00339                 buffer->append(channels.out(), (U8*)read_buf, len);
00340         } while((APR_SUCCESS == status) && (READ_BUFFER_SIZE == len));
00341         lldebugs << "socket read status: " << status << llendl;
00342         LLIOPipe::EStatus rv = STATUS_OK;
00343 
00344         PUMP_DEBUG;
00345         // *FIX: Also need to check for broken pipe
00346         if(APR_STATUS_IS_EOF(status))
00347         {
00348                 // *FIX: Should we shut down the socket read?
00349                 if(pump)
00350                 {
00351                         pump->setConditional(this, NULL);
00352                 }
00353                 rv = STATUS_DONE;
00354                 eos = true;
00355         }
00356         else if(APR_STATUS_IS_EAGAIN(status))
00357         {
00358                 // everything is fine, but we can terminate this process pump.
00359                 rv = STATUS_BREAK;
00360         }
00361         else
00362         {
00363                 if(ll_apr_warn_status(status))
00364                 {
00365                         rv = STATUS_ERROR;
00366                 }
00367         }
00368         PUMP_DEBUG;
00369         return rv;
00370 }
00371 
00375 
00376 LLIOSocketWriter::LLIOSocketWriter(LLSocket::ptr_t socket) :
00377         mDestination(socket),
00378         mLastWritten(NULL),
00379         mInitialized(false)
00380 {
00381         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00382 }
00383 
00384 LLIOSocketWriter::~LLIOSocketWriter()
00385 {
00386         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00387         //lldebugs << "Destroying LLIOSocketWriter" << llendl;
00388 }
00389 
00390 // virtual
00391 LLIOPipe::EStatus LLIOSocketWriter::process_impl(
00392         const LLChannelDescriptors& channels,
00393         buffer_ptr_t& buffer,
00394         bool& eos,
00395         LLSD& context,
00396         LLPumpIO* pump)
00397 {
00398         PUMP_DEBUG;
00399         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00400         if(!mDestination) return STATUS_PRECONDITION_NOT_MET;
00401         if(!mInitialized)
00402         {
00403                 PUMP_DEBUG;
00404                 // Since the write will not block, it's ok to initialize and
00405                 // attempt to write immediately.
00406                 mInitialized = true;
00407                 if(pump)
00408                 {
00409                         PUMP_DEBUG;
00410                         lldebugs << "Initializing poll descriptor for LLIOSocketWriter."
00411                                          << llendl;
00412                         apr_pollfd_t poll_fd;
00413                         poll_fd.p = NULL;
00414                         poll_fd.desc_type = APR_POLL_SOCKET;
00415                         poll_fd.reqevents = APR_POLLOUT;
00416                         poll_fd.rtnevents = 0x0;
00417                         poll_fd.desc.s = mDestination->getSocket();
00418                         poll_fd.client_data = NULL;
00419                         pump->setConditional(this, &poll_fd);
00420                 }
00421         }
00422 
00423         PUMP_DEBUG;
00424         // *FIX: Some sort of writev implementation would be much more
00425         // efficient - not only because writev() is better, but also
00426         // because we won't have to do as much work to find the start
00427         // address.
00428         LLBufferArray::segment_iterator_t it;
00429         LLBufferArray::segment_iterator_t end = buffer->endSegment();
00430         LLSegment segment;
00431         it = buffer->constructSegmentAfter(mLastWritten, segment);
00432         /*
00433         if(NULL == mLastWritten)
00434         {
00435                 it = buffer->beginSegment();
00436                 segment = (*it);
00437         }
00438         else
00439         {
00440                 it = buffer->getSegment(mLastWritten);
00441                 segment = (*it);
00442                 S32 size = segment.size();
00443                 U8* data = segment.data();
00444                 if((data + size) == mLastWritten)
00445                 {
00446                         ++it;
00447                         segment = (*it);
00448                 }
00449                 else
00450                 {
00451                         // *FIX: check the math on this one
00452                         segment = LLSegment(
00453                                 (*it).getChannelMask(),
00454                                 mLastWritten + 1,
00455                                 size - (mLastWritten - data));
00456                 }
00457         }
00458         */
00459 
00460         PUMP_DEBUG;
00461         apr_size_t len;
00462         bool done = false;
00463         apr_status_t status = APR_SUCCESS;
00464         while(it != end)
00465         {
00466 
00467                 PUMP_DEBUG;
00468                 if((*it).isOnChannel(channels.in()))
00469                 {
00470                         PUMP_DEBUG;
00471                         len = (apr_size_t)segment.size();
00472                         status = apr_socket_send(
00473                                 mDestination->getSocket(),
00474                                 (const char*)segment.data(),
00475                                 &len);
00476                         // We sometimes get a 'non-blocking socket operation could not be 
00477                         // completed immediately' error from apr_socket_send.  In this
00478                         // case we break and the data will be sent the next time the chain
00479                         // is pumped.
00480                         if(APR_STATUS_IS_EAGAIN(status))
00481                         {
00482                                 ll_apr_warn_status(status);
00483                                 break;
00484                         }
00485 
00486                         mLastWritten = segment.data() + len - 1;
00487 
00488                         PUMP_DEBUG;
00489                         if((S32)len < segment.size())
00490                         {
00491                                 break;
00492                         }
00493                         
00494                 }
00495 
00496                 ++it;
00497                 if(it != end)
00498                 {
00499                         segment = (*it);
00500                 }
00501                 else
00502                 {
00503                         done = true;
00504                 }
00505 
00506         }
00507         PUMP_DEBUG;
00508         if(done && eos)
00509         {
00510                 return STATUS_DONE;
00511         }
00512         return STATUS_OK;
00513 }
00514 
00515 
00519 
00520 LLIOServerSocket::LLIOServerSocket(
00521         apr_pool_t* pool,
00522         LLIOServerSocket::socket_t listener,
00523         factory_t factory) :
00524         mPool(pool),
00525         mListenSocket(listener),
00526         mReactor(factory),
00527         mInitialized(false),
00528         mResponseTimeout(DEFAULT_CHAIN_EXPIRY_SECS)
00529 {
00530         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00531 }
00532 
00533 LLIOServerSocket::~LLIOServerSocket()
00534 {
00535         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00536         //lldebugs << "Destroying LLIOServerSocket" << llendl;
00537 }
00538 
00539 void LLIOServerSocket::setResponseTimeout(F32 timeout_secs)
00540 {
00541         mResponseTimeout = timeout_secs;
00542 }
00543 
00544 // virtual
00545 LLIOPipe::EStatus LLIOServerSocket::process_impl(
00546         const LLChannelDescriptors& channels,
00547         buffer_ptr_t& buffer,
00548         bool& eos,
00549         LLSD& context,
00550         LLPumpIO* pump)
00551 {
00552         PUMP_DEBUG;
00553         LLMemType m1(LLMemType::MTYPE_IO_TCP);
00554         if(!pump)
00555         {
00556                 llwarns << "Need a pump for server socket." << llendl;
00557                 return STATUS_ERROR;
00558         }
00559         if(!mInitialized)
00560         {
00561                 PUMP_DEBUG;
00562                 // This segment sets up the pump so that we do not call
00563                 // process again until we have an incoming read, aka connect()
00564                 // from a remote host.
00565                 lldebugs << "Initializing poll descriptor for LLIOServerSocket."
00566                                  << llendl;
00567                 apr_pollfd_t poll_fd;
00568                 poll_fd.p = NULL;
00569                 poll_fd.desc_type = APR_POLL_SOCKET;
00570                 poll_fd.reqevents = APR_POLLIN;
00571                 poll_fd.rtnevents = 0x0;
00572                 poll_fd.desc.s = mListenSocket->getSocket();
00573                 poll_fd.client_data = NULL;
00574                 pump->setConditional(this, &poll_fd);
00575                 mInitialized = true;
00576                 return STATUS_OK;
00577         }
00578 
00579         // we are initialized, and told to process, so we must have a
00580         // socket waiting for a connection.
00581         lldebugs << "accepting socket" << llendl;
00582 
00583         PUMP_DEBUG;
00584         apr_pool_t* new_pool = NULL;
00585         apr_status_t status = apr_pool_create(&new_pool, mPool);
00586         apr_socket_t* socket = NULL;
00587         status = apr_socket_accept(
00588                 &socket,
00589                 mListenSocket->getSocket(),
00590                 new_pool);
00591         LLSocket::ptr_t llsocket(LLSocket::create(socket, new_pool));
00592         //EStatus rv = STATUS_ERROR;
00593         if(llsocket)
00594         {
00595                 PUMP_DEBUG;
00596 
00597                 apr_sockaddr_t* remote_addr;
00598                 apr_socket_addr_get(&remote_addr, APR_REMOTE, socket);
00599                 
00600                 char* remote_host_string;
00601                 apr_sockaddr_ip_get(&remote_host_string, remote_addr);
00602 
00603                 LLSD context;
00604                 context["remote-host"] = remote_host_string;
00605                 context["remote-port"] = remote_addr->port;
00606 
00607                 LLPumpIO::chain_t chain;
00608                 chain.push_back(LLIOPipe::ptr_t(new LLIOSocketReader(llsocket)));
00609                 if(mReactor->build(chain, context))
00610                 {
00611                         chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(llsocket)));
00612                         pump->addChain(chain, mResponseTimeout);
00613                         status = STATUS_OK;
00614                 }
00615                 else
00616                 {
00617                         llwarns << "Unable to build reactor to socket." << llendl;
00618                 }
00619         }
00620         else
00621         {
00622                 llwarns << "Unable to create linden socket." << llendl;
00623         }
00624 
00625         PUMP_DEBUG;
00626         // This needs to always return success, lest it get removed from
00627         // the pump.
00628         return STATUS_OK;
00629 }
00630 
00631 
00632 #if 0
00633 LLIODataSocket::LLIODataSocket(
00634         U16 suggested_port,
00635         U16 start_discovery_port,
00636         apr_pool_t* pool) : 
00637         mSocket(NULL)
00638 {
00639         if(!pool || (PORT_INVALID == suggested_port)) return;
00640         if(ll_apr_warn_status(apr_socket_create(&mSocket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, pool))) return;
00641         apr_sockaddr_t* sa = NULL;
00642         if(ll_apr_warn_status(apr_sockaddr_info_get(&sa, APR_ANYADDR, APR_UNSPEC, suggested_port, 0, pool))) return;
00643         apr_status_t status = apr_socket_bind(mSocket, sa);
00644         if((start_discovery_port > 0) && is_addr_in_use(status))
00645         {
00646                 const U16 MAX_ATTEMPT_PORTS = 50;
00647                 for(U16 attempt_port = start_discovery_port;
00648                         attempt_port < (start_discovery_port + MAX_ATTEMPT_PORTS);
00649                         ++attempt_port)
00650                 {
00651                         sa->port = attempt_port;
00652                         sa->sa.sin.sin_port = htons(attempt_port);
00653                         status = apr_socket_bind(mSocket, sa);
00654                         if(APR_SUCCESS == status) break;
00655                         if(is_addr_in_use(status)) continue;
00656                         (void)ll_apr_warn_status(status);
00657                 }
00658         }
00659         if(ll_apr_warn_status(status)) return;
00660         if(sa->port)
00661         {
00662                 lldebugs << "Bound datagram socket to port: " << sa->port << llendl;
00663                 mPort = sa->port;
00664         }
00665         else
00666         {
00667                 mPort = LLIOSocket::PORT_EPHEMERAL;
00668         }
00669 
00670         // set up the socket options options
00671         ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
00672         ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
00673         ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
00674 }
00675 
00676 LLIODataSocket::~LLIODataSocket()
00677 {
00678 }
00679 
00680 
00681 #endif

Generated on Fri May 16 08:32:26 2008 for SecondLife by  doxygen 1.5.5