00001
00034 #include "linden_common.h"
00035 #include "lliosocket.h"
00036
00037 #include "llapr.h"
00038
00039 #include "llbuffer.h"
00040 #include "llhost.h"
00041 #include "llmemtype.h"
00042 #include "llpumpio.h"
00043
00044
00045
00046
00047
00048 static const S32 LL_DEFAULT_LISTEN_BACKLOG = 10;
00049 static const S32 LL_SEND_BUFFER_SIZE = 40000;
00050 static const S32 LL_RECV_BUFFER_SIZE = 40000;
00051
00052
00053
00054
00055
00056
00057
00058 bool is_addr_in_use(apr_status_t status)
00059 {
00060 #if LL_WINDOWS
00061 return (WSAEADDRINUSE == APR_TO_OS_ERROR(status));
00062 #else
00063 return (EADDRINUSE == APR_TO_OS_ERROR(status));
00064 #endif
00065 }
00066
00070
00071
00072 LLSocket::ptr_t LLSocket::create(apr_pool_t* pool, EType type, U16 port)
00073 {
00074 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00075 LLSocket::ptr_t rv;
00076 apr_socket_t* socket = NULL;
00077 apr_pool_t* new_pool = NULL;
00078 apr_status_t status = APR_EGENERAL;
00079
00080
00081 status = apr_pool_create(&new_pool, pool);
00082 if(ll_apr_warn_status(status))
00083 {
00084 if(new_pool) apr_pool_destroy(new_pool);
00085 return rv;
00086 }
00087
00088 if(STREAM_TCP == type)
00089 {
00090 status = apr_socket_create(
00091 &socket,
00092 APR_INET,
00093 SOCK_STREAM,
00094 APR_PROTO_TCP,
00095 new_pool);
00096 }
00097 else if(DATAGRAM_UDP == type)
00098 {
00099 status = apr_socket_create(
00100 &socket,
00101 APR_INET,
00102 SOCK_DGRAM,
00103 APR_PROTO_UDP,
00104 new_pool);
00105 }
00106 else
00107 {
00108 if(new_pool) apr_pool_destroy(new_pool);
00109 return rv;
00110 }
00111 if(ll_apr_warn_status(status))
00112 {
00113 if(new_pool) apr_pool_destroy(new_pool);
00114 return rv;
00115 }
00116 rv = ptr_t(new LLSocket(socket, new_pool));
00117 if(port > 0)
00118 {
00119 apr_sockaddr_t* sa = NULL;
00120 status = apr_sockaddr_info_get(
00121 &sa,
00122 APR_ANYADDR,
00123 APR_UNSPEC,
00124 port,
00125 0,
00126 new_pool);
00127 if(ll_apr_warn_status(status))
00128 {
00129 rv.reset();
00130 return rv;
00131 }
00132
00133
00134 ll_apr_warn_status(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
00135 status = apr_socket_bind(socket, sa);
00136 if(ll_apr_warn_status(status))
00137 {
00138 rv.reset();
00139 return rv;
00140 }
00141 lldebugs << "Bound " << ((DATAGRAM_UDP == type) ? "udp" : "tcp")
00142 << " socket to port: " << sa->port << llendl;
00143 if(STREAM_TCP == type)
00144 {
00145
00146
00147 lldebugs << "Setting listen state for socket." << llendl;
00148 status = apr_socket_listen(
00149 socket,
00150 LL_DEFAULT_LISTEN_BACKLOG);
00151 if(ll_apr_warn_status(status))
00152 {
00153 rv.reset();
00154 return rv;
00155 }
00156 }
00157 }
00158 else
00159 {
00160
00161
00162 port = PORT_EPHEMERAL;
00163 }
00164 rv->mPort = port;
00165 rv->setOptions();
00166 return rv;
00167 }
00168
00169
00170 LLSocket::ptr_t LLSocket::create(apr_socket_t* socket, apr_pool_t* pool)
00171 {
00172 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00173 LLSocket::ptr_t rv;
00174 if(!socket)
00175 {
00176 return rv;
00177 }
00178 rv = ptr_t(new LLSocket(socket, pool));
00179 rv->mPort = PORT_EPHEMERAL;
00180 rv->setOptions();
00181 return rv;
00182 }
00183
00184
00185 bool LLSocket::blockingConnect(const LLHost& host)
00186 {
00187 if(!mSocket) return false;
00188 apr_sockaddr_t* sa = NULL;
00189 char ip_address[MAXADDRSTR];
00190 host.getIPString(ip_address, MAXADDRSTR);
00191 if(ll_apr_warn_status(apr_sockaddr_info_get(
00192 &sa,
00193 ip_address,
00194 APR_UNSPEC,
00195 host.getPort(),
00196 0,
00197 mPool)))
00198 {
00199 return false;
00200 }
00201 apr_socket_timeout_set(mSocket, 1000);
00202 if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false;
00203 setOptions();
00204 return true;
00205 }
00206
00207 LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) :
00208 mSocket(socket),
00209 mPool(pool),
00210 mPort(PORT_INVALID)
00211 {
00212 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00213 }
00214
00215 LLSocket::~LLSocket()
00216 {
00217 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00218
00219
00220 if(mSocket)
00221 {
00222 apr_socket_close(mSocket);
00223 }
00224 if(mPool)
00225 {
00226 apr_pool_destroy(mPool);
00227 }
00228 }
00229
00230 void LLSocket::setOptions()
00231 {
00232 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00233
00234 ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
00235 ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
00236 ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
00237
00238 }
00239
00243
00244 LLIOSocketReader::LLIOSocketReader(LLSocket::ptr_t socket) :
00245 mSource(socket),
00246 mInitialized(false)
00247 {
00248 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00249 }
00250
00251 LLIOSocketReader::~LLIOSocketReader()
00252 {
00253 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00254
00255 }
00256
00257
00258 LLIOPipe::EStatus LLIOSocketReader::process_impl(
00259 const LLChannelDescriptors& channels,
00260 buffer_ptr_t& buffer,
00261 bool& eos,
00262 LLSD& context,
00263 LLPumpIO* pump)
00264 {
00265 PUMP_DEBUG;
00266 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00267 if(!mSource) return STATUS_PRECONDITION_NOT_MET;
00268 if(!mInitialized)
00269 {
00270 PUMP_DEBUG;
00271
00272
00273 mInitialized = true;
00274 if(pump)
00275 {
00276 PUMP_DEBUG;
00277 lldebugs << "Initializing poll descriptor for LLIOSocketReader."
00278 << llendl;
00279 apr_pollfd_t poll_fd;
00280 poll_fd.p = NULL;
00281 poll_fd.desc_type = APR_POLL_SOCKET;
00282 poll_fd.reqevents = APR_POLLIN;
00283 poll_fd.rtnevents = 0x0;
00284 poll_fd.desc.s = mSource->getSocket();
00285 poll_fd.client_data = NULL;
00286 pump->setConditional(this, &poll_fd);
00287 }
00288 }
00289
00290
00291
00292
00293 PUMP_DEBUG;
00294 const apr_size_t READ_BUFFER_SIZE = 1024;
00295 char read_buf[READ_BUFFER_SIZE];
00296 apr_size_t len;
00297 apr_status_t status = APR_SUCCESS;
00298 do
00299 {
00300 PUMP_DEBUG;
00301 len = READ_BUFFER_SIZE;
00302 status = apr_socket_recv(mSource->getSocket(), read_buf, &len);
00303 buffer->append(channels.out(), (U8*)read_buf, len);
00304 } while((APR_SUCCESS == status) && (READ_BUFFER_SIZE == len));
00305 lldebugs << "socket read status: " << status << llendl;
00306 LLIOPipe::EStatus rv = STATUS_OK;
00307
00308 PUMP_DEBUG;
00309
00310 if(APR_STATUS_IS_EOF(status))
00311 {
00312
00313 if(pump)
00314 {
00315 pump->setConditional(this, NULL);
00316 }
00317 rv = STATUS_DONE;
00318 eos = true;
00319 }
00320 else if(APR_STATUS_IS_EAGAIN(status))
00321 {
00322
00323 rv = STATUS_BREAK;
00324 }
00325 else
00326 {
00327 if(ll_apr_warn_status(status))
00328 {
00329 rv = STATUS_ERROR;
00330 }
00331 }
00332 PUMP_DEBUG;
00333 return rv;
00334 }
00335
00339
00340 LLIOSocketWriter::LLIOSocketWriter(LLSocket::ptr_t socket) :
00341 mDestination(socket),
00342 mLastWritten(NULL),
00343 mInitialized(false)
00344 {
00345 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00346 }
00347
00348 LLIOSocketWriter::~LLIOSocketWriter()
00349 {
00350 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00351
00352 }
00353
00354
00355 LLIOPipe::EStatus LLIOSocketWriter::process_impl(
00356 const LLChannelDescriptors& channels,
00357 buffer_ptr_t& buffer,
00358 bool& eos,
00359 LLSD& context,
00360 LLPumpIO* pump)
00361 {
00362 PUMP_DEBUG;
00363 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00364 if(!mDestination) return STATUS_PRECONDITION_NOT_MET;
00365 if(!mInitialized)
00366 {
00367 PUMP_DEBUG;
00368
00369
00370 mInitialized = true;
00371 if(pump)
00372 {
00373 PUMP_DEBUG;
00374 lldebugs << "Initializing poll descriptor for LLIOSocketWriter."
00375 << llendl;
00376 apr_pollfd_t poll_fd;
00377 poll_fd.p = NULL;
00378 poll_fd.desc_type = APR_POLL_SOCKET;
00379 poll_fd.reqevents = APR_POLLOUT;
00380 poll_fd.rtnevents = 0x0;
00381 poll_fd.desc.s = mDestination->getSocket();
00382 poll_fd.client_data = NULL;
00383 pump->setConditional(this, &poll_fd);
00384 }
00385 }
00386
00387 PUMP_DEBUG;
00388
00389
00390
00391
00392 LLBufferArray::segment_iterator_t it;
00393 LLBufferArray::segment_iterator_t end = buffer->endSegment();
00394 LLSegment segment;
00395 it = buffer->constructSegmentAfter(mLastWritten, segment);
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424 PUMP_DEBUG;
00425 apr_size_t len;
00426 bool done = false;
00427 apr_status_t status = APR_SUCCESS;
00428 while(it != end)
00429 {
00430
00431 PUMP_DEBUG;
00432 if((*it).isOnChannel(channels.in()))
00433 {
00434 PUMP_DEBUG;
00435 len = (apr_size_t)segment.size();
00436 status = apr_socket_send(
00437 mDestination->getSocket(),
00438 (const char*)segment.data(),
00439 &len);
00440
00441
00442
00443
00444 if(APR_STATUS_IS_EAGAIN(status))
00445 {
00446 ll_apr_warn_status(status);
00447 break;
00448 }
00449
00450 mLastWritten = segment.data() + len - 1;
00451
00452 PUMP_DEBUG;
00453 if((S32)len < segment.size())
00454 {
00455 break;
00456 }
00457
00458 }
00459
00460 ++it;
00461 if(it != end)
00462 {
00463 segment = (*it);
00464 }
00465 else
00466 {
00467 done = true;
00468 }
00469
00470 }
00471 PUMP_DEBUG;
00472 if(done && eos)
00473 {
00474 return STATUS_DONE;
00475 }
00476 return STATUS_OK;
00477 }
00478
00479
00483
00484 LLIOServerSocket::LLIOServerSocket(
00485 apr_pool_t* pool,
00486 LLIOServerSocket::socket_t listener,
00487 factory_t factory) :
00488 mPool(pool),
00489 mListenSocket(listener),
00490 mReactor(factory),
00491 mInitialized(false),
00492 mResponseTimeout(DEFAULT_CHAIN_EXPIRY_SECS)
00493 {
00494 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00495 }
00496
00497 LLIOServerSocket::~LLIOServerSocket()
00498 {
00499 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00500
00501 }
00502
00503 void LLIOServerSocket::setResponseTimeout(F32 timeout_secs)
00504 {
00505 mResponseTimeout = timeout_secs;
00506 }
00507
00508
00509 LLIOPipe::EStatus LLIOServerSocket::process_impl(
00510 const LLChannelDescriptors& channels,
00511 buffer_ptr_t& buffer,
00512 bool& eos,
00513 LLSD& context,
00514 LLPumpIO* pump)
00515 {
00516 PUMP_DEBUG;
00517 LLMemType m1(LLMemType::MTYPE_IO_TCP);
00518 if(!pump)
00519 {
00520 llwarns << "Need a pump for server socket." << llendl;
00521 return STATUS_ERROR;
00522 }
00523 if(!mInitialized)
00524 {
00525 PUMP_DEBUG;
00526
00527
00528
00529 lldebugs << "Initializing poll descriptor for LLIOServerSocket."
00530 << llendl;
00531 apr_pollfd_t poll_fd;
00532 poll_fd.p = NULL;
00533 poll_fd.desc_type = APR_POLL_SOCKET;
00534 poll_fd.reqevents = APR_POLLIN;
00535 poll_fd.rtnevents = 0x0;
00536 poll_fd.desc.s = mListenSocket->getSocket();
00537 poll_fd.client_data = NULL;
00538 pump->setConditional(this, &poll_fd);
00539 mInitialized = true;
00540 return STATUS_OK;
00541 }
00542
00543
00544
00545 lldebugs << "accepting socket" << llendl;
00546
00547 PUMP_DEBUG;
00548 apr_pool_t* new_pool = NULL;
00549 apr_status_t status = apr_pool_create(&new_pool, mPool);
00550 apr_socket_t* socket = NULL;
00551 status = apr_socket_accept(
00552 &socket,
00553 mListenSocket->getSocket(),
00554 new_pool);
00555 LLSocket::ptr_t llsocket(LLSocket::create(socket, new_pool));
00556
00557 if(llsocket)
00558 {
00559 PUMP_DEBUG;
00560
00561 apr_sockaddr_t* remote_addr;
00562 apr_socket_addr_get(&remote_addr, APR_REMOTE, socket);
00563
00564 char* remote_host_string;
00565 apr_sockaddr_ip_get(&remote_host_string, remote_addr);
00566
00567 LLSD context;
00568 context["remote-host"] = remote_host_string;
00569 context["remote-port"] = remote_addr->port;
00570
00571 LLPumpIO::chain_t chain;
00572 chain.push_back(LLIOPipe::ptr_t(new LLIOSocketReader(llsocket)));
00573 if(mReactor->build(chain, context))
00574 {
00575 chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(llsocket)));
00576 pump->addChain(chain, mResponseTimeout);
00577 status = STATUS_OK;
00578 }
00579 else
00580 {
00581 llwarns << "Unable to build reactor to socket." << llendl;
00582 }
00583 }
00584 else
00585 {
00586 llwarns << "Unable to create linden socket." << llendl;
00587 }
00588
00589 PUMP_DEBUG;
00590
00591
00592 return STATUS_OK;
00593 }
00594
00595
00596 #if 0
00597 LLIODataSocket::LLIODataSocket(
00598 U16 suggested_port,
00599 U16 start_discovery_port,
00600 apr_pool_t* pool) :
00601 mSocket(NULL)
00602 {
00603 if(!pool || (PORT_INVALID == suggested_port)) return;
00604 if(ll_apr_warn_status(apr_socket_create(&mSocket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, pool))) return;
00605 apr_sockaddr_t* sa = NULL;
00606 if(ll_apr_warn_status(apr_sockaddr_info_get(&sa, APR_ANYADDR, APR_UNSPEC, suggested_port, 0, pool))) return;
00607 apr_status_t status = apr_socket_bind(mSocket, sa);
00608 if((start_discovery_port > 0) && is_addr_in_use(status))
00609 {
00610 const U16 MAX_ATTEMPT_PORTS = 50;
00611 for(U16 attempt_port = start_discovery_port;
00612 attempt_port < (start_discovery_port + MAX_ATTEMPT_PORTS);
00613 ++attempt_port)
00614 {
00615 sa->port = attempt_port;
00616 sa->sa.sin.sin_port = htons(attempt_port);
00617 status = apr_socket_bind(mSocket, sa);
00618 if(APR_SUCCESS == status) break;
00619 if(is_addr_in_use(status)) continue;
00620 (void)ll_apr_warn_status(status);
00621 }
00622 }
00623 if(ll_apr_warn_status(status)) return;
00624 if(sa->port)
00625 {
00626 lldebugs << "Bound datagram socket to port: " << sa->port << llendl;
00627 mPort = sa->port;
00628 }
00629 else
00630 {
00631 mPort = LLIOSocket::PORT_EPHEMERAL;
00632 }
00633
00634
00635 ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
00636 ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
00637 ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
00638 }
00639
00640 LLIODataSocket::~LLIODataSocket()
00641 {
00642 }
00643
00644
00645 #endif