00001
00034 #include "linden_common.h"
00035 #include "lliosocket.h"
00036
00037 #include "llapr.h"
00038
00039 #include "llbuffer.h"
00040 #include "llhost.h"
00041 #include "llmemtype.h"
00042 #include "llpumpio.h"
00043
00044
00045
00046
00047
00048 static const S32 LL_DEFAULT_LISTEN_BACKLOG = 10;
00049 static const S32 LL_SEND_BUFFER_SIZE = 40000;
00050 static const S32 LL_RECV_BUFFER_SIZE = 40000;
00051
00052
00053
00054
00055
00056
00057
00058 bool is_addr_in_use(apr_status_t status)
00059 {
00060 #if LL_WINDOWS
00061 return (WSAEADDRINUSE == APR_TO_OS_ERROR(status));
00062 #else
00063 return (EADDRINUSE == APR_TO_OS_ERROR(status));
00064 #endif
00065 }
00066
00067 #if LL_LINUX
00068
00069
00070 #if LL_DEBUG_SOCKET_FILE_DESCRIPTORS
00071 #include "apr-1/apr_portable.h"
00072 #endif
00073 #endif
00074
00075
00076
00077 void ll_debug_socket(const char* msg, apr_socket_t* apr_sock)
00078 {
00079 #if LL_DEBUG_SOCKET_FILE_DESCRIPTORS
00080 if(!apr_sock)
00081 {
00082 lldebugs << "Socket -- " << (msg?msg:"") << ": no socket." << llendl;
00083 return;
00084 }
00085
00086
00087 int os_sock;
00088 if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock))
00089 {
00090 lldebugs << "Socket -- " << (msg?msg:"") << " on fd " << os_sock
00091 << " at " << apr_sock << llendl;
00092 }
00093 else
00094 {
00095 lldebugs << "Socket -- " << (msg?msg:"") << " no fd "
00096 << " at " << apr_sock << llendl;
00097 }
00098 #endif
00099 }
00100
00104
00105
00106 LLSocket::ptr_t LLSocket::create(apr_pool_t* pool, EType type, U16 port)
00107 {
00108 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00109 LLSocket::ptr_t rv;
00110 apr_socket_t* socket = NULL;
00111 apr_pool_t* new_pool = NULL;
00112 apr_status_t status = APR_EGENERAL;
00113
00114
00115 status = apr_pool_create(&new_pool, pool);
00116 if(ll_apr_warn_status(status))
00117 {
00118 if(new_pool) apr_pool_destroy(new_pool);
00119 return rv;
00120 }
00121
00122 if(STREAM_TCP == type)
00123 {
00124 status = apr_socket_create(
00125 &socket,
00126 APR_INET,
00127 SOCK_STREAM,
00128 APR_PROTO_TCP,
00129 new_pool);
00130 }
00131 else if(DATAGRAM_UDP == type)
00132 {
00133 status = apr_socket_create(
00134 &socket,
00135 APR_INET,
00136 SOCK_DGRAM,
00137 APR_PROTO_UDP,
00138 new_pool);
00139 }
00140 else
00141 {
00142 if(new_pool) apr_pool_destroy(new_pool);
00143 return rv;
00144 }
00145 if(ll_apr_warn_status(status))
00146 {
00147 if(new_pool) apr_pool_destroy(new_pool);
00148 return rv;
00149 }
00150 rv = ptr_t(new LLSocket(socket, new_pool));
00151 if(port > 0)
00152 {
00153 apr_sockaddr_t* sa = NULL;
00154 status = apr_sockaddr_info_get(
00155 &sa,
00156 APR_ANYADDR,
00157 APR_UNSPEC,
00158 port,
00159 0,
00160 new_pool);
00161 if(ll_apr_warn_status(status))
00162 {
00163 rv.reset();
00164 return rv;
00165 }
00166
00167
00168 ll_apr_warn_status(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
00169 status = apr_socket_bind(socket, sa);
00170 if(ll_apr_warn_status(status))
00171 {
00172 rv.reset();
00173 return rv;
00174 }
00175 lldebugs << "Bound " << ((DATAGRAM_UDP == type) ? "udp" : "tcp")
00176 << " socket to port: " << sa->port << llendl;
00177 if(STREAM_TCP == type)
00178 {
00179
00180
00181 lldebugs << "Setting listen state for socket." << llendl;
00182 status = apr_socket_listen(
00183 socket,
00184 LL_DEFAULT_LISTEN_BACKLOG);
00185 if(ll_apr_warn_status(status))
00186 {
00187 rv.reset();
00188 return rv;
00189 }
00190 }
00191 }
00192 else
00193 {
00194
00195
00196 port = PORT_EPHEMERAL;
00197 }
00198 rv->mPort = port;
00199 rv->setOptions();
00200 return rv;
00201 }
00202
00203
00204 LLSocket::ptr_t LLSocket::create(apr_socket_t* socket, apr_pool_t* pool)
00205 {
00206 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00207 LLSocket::ptr_t rv;
00208 if(!socket)
00209 {
00210 return rv;
00211 }
00212 rv = ptr_t(new LLSocket(socket, pool));
00213 rv->mPort = PORT_EPHEMERAL;
00214 rv->setOptions();
00215 return rv;
00216 }
00217
00218
00219 bool LLSocket::blockingConnect(const LLHost& host)
00220 {
00221 if(!mSocket) return false;
00222 apr_sockaddr_t* sa = NULL;
00223 char ip_address[MAXADDRSTR];
00224 host.getIPString(ip_address, MAXADDRSTR);
00225 if(ll_apr_warn_status(apr_sockaddr_info_get(
00226 &sa,
00227 ip_address,
00228 APR_UNSPEC,
00229 host.getPort(),
00230 0,
00231 mPool)))
00232 {
00233 return false;
00234 }
00235 apr_socket_timeout_set(mSocket, 1000);
00236 ll_debug_socket("Blocking connect", mSocket);
00237 if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false;
00238 setOptions();
00239 return true;
00240 }
00241
00242 LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) :
00243 mSocket(socket),
00244 mPool(pool),
00245 mPort(PORT_INVALID)
00246 {
00247 ll_debug_socket("Constructing wholely formed socket", mSocket);
00248 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00249 }
00250
00251 LLSocket::~LLSocket()
00252 {
00253 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00254
00255 if(mSocket)
00256 {
00257 ll_debug_socket("Destroying socket", mSocket);
00258 apr_socket_close(mSocket);
00259 }
00260 if(mPool)
00261 {
00262 apr_pool_destroy(mPool);
00263 }
00264 }
00265
00266 void LLSocket::setOptions()
00267 {
00268 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00269
00270 ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
00271 ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
00272 ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
00273
00274 }
00275
00279
00280 LLIOSocketReader::LLIOSocketReader(LLSocket::ptr_t socket) :
00281 mSource(socket),
00282 mInitialized(false)
00283 {
00284 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00285 }
00286
00287 LLIOSocketReader::~LLIOSocketReader()
00288 {
00289 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00290
00291 }
00292
00293
00294 LLIOPipe::EStatus LLIOSocketReader::process_impl(
00295 const LLChannelDescriptors& channels,
00296 buffer_ptr_t& buffer,
00297 bool& eos,
00298 LLSD& context,
00299 LLPumpIO* pump)
00300 {
00301 PUMP_DEBUG;
00302 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00303 if(!mSource) return STATUS_PRECONDITION_NOT_MET;
00304 if(!mInitialized)
00305 {
00306 PUMP_DEBUG;
00307
00308
00309 mInitialized = true;
00310 if(pump)
00311 {
00312 PUMP_DEBUG;
00313 lldebugs << "Initializing poll descriptor for LLIOSocketReader."
00314 << llendl;
00315 apr_pollfd_t poll_fd;
00316 poll_fd.p = NULL;
00317 poll_fd.desc_type = APR_POLL_SOCKET;
00318 poll_fd.reqevents = APR_POLLIN;
00319 poll_fd.rtnevents = 0x0;
00320 poll_fd.desc.s = mSource->getSocket();
00321 poll_fd.client_data = NULL;
00322 pump->setConditional(this, &poll_fd);
00323 }
00324 }
00325
00326
00327
00328
00329 PUMP_DEBUG;
00330 const apr_size_t READ_BUFFER_SIZE = 1024;
00331 char read_buf[READ_BUFFER_SIZE];
00332 apr_size_t len;
00333 apr_status_t status = APR_SUCCESS;
00334 do
00335 {
00336 PUMP_DEBUG;
00337 len = READ_BUFFER_SIZE;
00338 status = apr_socket_recv(mSource->getSocket(), read_buf, &len);
00339 buffer->append(channels.out(), (U8*)read_buf, len);
00340 } while((APR_SUCCESS == status) && (READ_BUFFER_SIZE == len));
00341 lldebugs << "socket read status: " << status << llendl;
00342 LLIOPipe::EStatus rv = STATUS_OK;
00343
00344 PUMP_DEBUG;
00345
00346 if(APR_STATUS_IS_EOF(status))
00347 {
00348
00349 if(pump)
00350 {
00351 pump->setConditional(this, NULL);
00352 }
00353 rv = STATUS_DONE;
00354 eos = true;
00355 }
00356 else if(APR_STATUS_IS_EAGAIN(status))
00357 {
00358
00359 rv = STATUS_BREAK;
00360 }
00361 else
00362 {
00363 if(ll_apr_warn_status(status))
00364 {
00365 rv = STATUS_ERROR;
00366 }
00367 }
00368 PUMP_DEBUG;
00369 return rv;
00370 }
00371
00375
00376 LLIOSocketWriter::LLIOSocketWriter(LLSocket::ptr_t socket) :
00377 mDestination(socket),
00378 mLastWritten(NULL),
00379 mInitialized(false)
00380 {
00381 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00382 }
00383
00384 LLIOSocketWriter::~LLIOSocketWriter()
00385 {
00386 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00387
00388 }
00389
00390
00391 LLIOPipe::EStatus LLIOSocketWriter::process_impl(
00392 const LLChannelDescriptors& channels,
00393 buffer_ptr_t& buffer,
00394 bool& eos,
00395 LLSD& context,
00396 LLPumpIO* pump)
00397 {
00398 PUMP_DEBUG;
00399 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00400 if(!mDestination) return STATUS_PRECONDITION_NOT_MET;
00401 if(!mInitialized)
00402 {
00403 PUMP_DEBUG;
00404
00405
00406 mInitialized = true;
00407 if(pump)
00408 {
00409 PUMP_DEBUG;
00410 lldebugs << "Initializing poll descriptor for LLIOSocketWriter."
00411 << llendl;
00412 apr_pollfd_t poll_fd;
00413 poll_fd.p = NULL;
00414 poll_fd.desc_type = APR_POLL_SOCKET;
00415 poll_fd.reqevents = APR_POLLOUT;
00416 poll_fd.rtnevents = 0x0;
00417 poll_fd.desc.s = mDestination->getSocket();
00418 poll_fd.client_data = NULL;
00419 pump->setConditional(this, &poll_fd);
00420 }
00421 }
00422
00423 PUMP_DEBUG;
00424
00425
00426
00427
00428 LLBufferArray::segment_iterator_t it;
00429 LLBufferArray::segment_iterator_t end = buffer->endSegment();
00430 LLSegment segment;
00431 it = buffer->constructSegmentAfter(mLastWritten, segment);
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460 PUMP_DEBUG;
00461 apr_size_t len;
00462 bool done = false;
00463 apr_status_t status = APR_SUCCESS;
00464 while(it != end)
00465 {
00466
00467 PUMP_DEBUG;
00468 if((*it).isOnChannel(channels.in()))
00469 {
00470 PUMP_DEBUG;
00471 len = (apr_size_t)segment.size();
00472 status = apr_socket_send(
00473 mDestination->getSocket(),
00474 (const char*)segment.data(),
00475 &len);
00476
00477
00478
00479
00480 if(APR_STATUS_IS_EAGAIN(status))
00481 {
00482 ll_apr_warn_status(status);
00483 break;
00484 }
00485
00486 mLastWritten = segment.data() + len - 1;
00487
00488 PUMP_DEBUG;
00489 if((S32)len < segment.size())
00490 {
00491 break;
00492 }
00493
00494 }
00495
00496 ++it;
00497 if(it != end)
00498 {
00499 segment = (*it);
00500 }
00501 else
00502 {
00503 done = true;
00504 }
00505
00506 }
00507 PUMP_DEBUG;
00508 if(done && eos)
00509 {
00510 return STATUS_DONE;
00511 }
00512 return STATUS_OK;
00513 }
00514
00515
00519
00520 LLIOServerSocket::LLIOServerSocket(
00521 apr_pool_t* pool,
00522 LLIOServerSocket::socket_t listener,
00523 factory_t factory) :
00524 mPool(pool),
00525 mListenSocket(listener),
00526 mReactor(factory),
00527 mInitialized(false),
00528 mResponseTimeout(DEFAULT_CHAIN_EXPIRY_SECS)
00529 {
00530 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00531 }
00532
00533 LLIOServerSocket::~LLIOServerSocket()
00534 {
00535 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00536
00537 }
00538
00539 void LLIOServerSocket::setResponseTimeout(F32 timeout_secs)
00540 {
00541 mResponseTimeout = timeout_secs;
00542 }
00543
00544
00545 LLIOPipe::EStatus LLIOServerSocket::process_impl(
00546 const LLChannelDescriptors& channels,
00547 buffer_ptr_t& buffer,
00548 bool& eos,
00549 LLSD& context,
00550 LLPumpIO* pump)
00551 {
00552 PUMP_DEBUG;
00553 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00554 if(!pump)
00555 {
00556 llwarns << "Need a pump for server socket." << llendl;
00557 return STATUS_ERROR;
00558 }
00559 if(!mInitialized)
00560 {
00561 PUMP_DEBUG;
00562
00563
00564
00565 lldebugs << "Initializing poll descriptor for LLIOServerSocket."
00566 << llendl;
00567 apr_pollfd_t poll_fd;
00568 poll_fd.p = NULL;
00569 poll_fd.desc_type = APR_POLL_SOCKET;
00570 poll_fd.reqevents = APR_POLLIN;
00571 poll_fd.rtnevents = 0x0;
00572 poll_fd.desc.s = mListenSocket->getSocket();
00573 poll_fd.client_data = NULL;
00574 pump->setConditional(this, &poll_fd);
00575 mInitialized = true;
00576 return STATUS_OK;
00577 }
00578
00579
00580
00581 lldebugs << "accepting socket" << llendl;
00582
00583 PUMP_DEBUG;
00584 apr_pool_t* new_pool = NULL;
00585 apr_status_t status = apr_pool_create(&new_pool, mPool);
00586 apr_socket_t* socket = NULL;
00587 status = apr_socket_accept(
00588 &socket,
00589 mListenSocket->getSocket(),
00590 new_pool);
00591 LLSocket::ptr_t llsocket(LLSocket::create(socket, new_pool));
00592
00593 if(llsocket)
00594 {
00595 PUMP_DEBUG;
00596
00597 apr_sockaddr_t* remote_addr;
00598 apr_socket_addr_get(&remote_addr, APR_REMOTE, socket);
00599
00600 char* remote_host_string;
00601 apr_sockaddr_ip_get(&remote_host_string, remote_addr);
00602
00603 LLSD context;
00604 context["remote-host"] = remote_host_string;
00605 context["remote-port"] = remote_addr->port;
00606
00607 LLPumpIO::chain_t chain;
00608 chain.push_back(LLIOPipe::ptr_t(new LLIOSocketReader(llsocket)));
00609 if(mReactor->build(chain, context))
00610 {
00611 chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(llsocket)));
00612 pump->addChain(chain, mResponseTimeout);
00613 status = STATUS_OK;
00614 }
00615 else
00616 {
00617 llwarns << "Unable to build reactor to socket." << llendl;
00618 }
00619 }
00620 else
00621 {
00622 llwarns << "Unable to create linden socket." << llendl;
00623 }
00624
00625 PUMP_DEBUG;
00626
00627
00628 return STATUS_OK;
00629 }
00630
00631
00632 #if 0
00633 LLIODataSocket::LLIODataSocket(
00634 U16 suggested_port,
00635 U16 start_discovery_port,
00636 apr_pool_t* pool) :
00637 mSocket(NULL)
00638 {
00639 if(!pool || (PORT_INVALID == suggested_port)) return;
00640 if(ll_apr_warn_status(apr_socket_create(&mSocket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, pool))) return;
00641 apr_sockaddr_t* sa = NULL;
00642 if(ll_apr_warn_status(apr_sockaddr_info_get(&sa, APR_ANYADDR, APR_UNSPEC, suggested_port, 0, pool))) return;
00643 apr_status_t status = apr_socket_bind(mSocket, sa);
00644 if((start_discovery_port > 0) && is_addr_in_use(status))
00645 {
00646 const U16 MAX_ATTEMPT_PORTS = 50;
00647 for(U16 attempt_port = start_discovery_port;
00648 attempt_port < (start_discovery_port + MAX_ATTEMPT_PORTS);
00649 ++attempt_port)
00650 {
00651 sa->port = attempt_port;
00652 sa->sa.sin.sin_port = htons(attempt_port);
00653 status = apr_socket_bind(mSocket, sa);
00654 if(APR_SUCCESS == status) break;
00655 if(is_addr_in_use(status)) continue;
00656 (void)ll_apr_warn_status(status);
00657 }
00658 }
00659 if(ll_apr_warn_status(status)) return;
00660 if(sa->port)
00661 {
00662 lldebugs << "Bound datagram socket to port: " << sa->port << llendl;
00663 mPort = sa->port;
00664 }
00665 else
00666 {
00667 mPort = LLIOSocket::PORT_EPHEMERAL;
00668 }
00669
00670
00671 ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
00672 ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
00673 ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
00674 }
00675
00676 LLIODataSocket::~LLIODataSocket()
00677 {
00678 }
00679
00680
00681 #endif