00001
00032 #include "linden_common.h"
00033
00034 #include "llxfermanager.h"
00035
00036 #include "llxfer.h"
00037 #include "llxfer_file.h"
00038 #include "llxfer_mem.h"
00039 #include "llxfer_vfile.h"
00040
00041 #include "llerror.h"
00042 #include "lluuid.h"
00043 #include "u64.h"
00044
00045 const F32 LL_XFER_REGISTRATION_TIMEOUT = 60.0f;
00046 const F32 LL_PACKET_TIMEOUT = 3.0f;
00047 const S32 LL_PACKET_RETRY_LIMIT = 10;
00048
00049 const S32 LL_DEFAULT_MAX_SIMULTANEOUS_XFERS = 10;
00050 const S32 LL_DEFAULT_MAX_REQUEST_FIFO_XFERS = 1000;
00051
00052 #define LL_XFER_PROGRESS_MESSAGES 0
00053 #define LL_XFER_TEST_REXMIT 0
00054
00055
00057
00058 LLXferManager::LLXferManager (LLVFS *vfs)
00059 {
00060 init(vfs);
00061 }
00062
00064
00065 LLXferManager::~LLXferManager ()
00066 {
00067 free();
00068 }
00069
00071
00072 void LLXferManager::init (LLVFS *vfs)
00073 {
00074 mSendList = NULL;
00075 mReceiveList = NULL;
00076
00077 setMaxOutgoingXfersPerCircuit(LL_DEFAULT_MAX_SIMULTANEOUS_XFERS);
00078 setMaxIncomingXfers(LL_DEFAULT_MAX_REQUEST_FIFO_XFERS);
00079
00080 mVFS = vfs;
00081
00082
00083 mUseAckThrottling = FALSE;
00084 setAckThrottleBPS(100000);
00085 }
00086
00088
00089 void LLXferManager::free ()
00090 {
00091 LLXfer *xferp;
00092 LLXfer *delp;
00093
00094 for_each(mOutgoingHosts.begin(), mOutgoingHosts.end(), DeletePointer());
00095 mOutgoingHosts.clear();
00096
00097 delp = mSendList;
00098 while (delp)
00099 {
00100 xferp = delp->mNext;
00101 delete delp;
00102 delp = xferp;
00103 }
00104 mSendList = NULL;
00105
00106 delp = mReceiveList;
00107 while (delp)
00108 {
00109 xferp = delp->mNext;
00110 delete delp;
00111 delp = xferp;
00112 }
00113 mReceiveList = NULL;
00114 }
00115
00117
00118 void LLXferManager::setMaxIncomingXfers(S32 max_num)
00119 {
00120 mMaxIncomingXfers = max_num;
00121 }
00122
00124
00125 void LLXferManager::setMaxOutgoingXfersPerCircuit(S32 max_num)
00126 {
00127 mMaxOutgoingXfersPerCircuit = max_num;
00128 }
00129
00130 void LLXferManager::setUseAckThrottling(const BOOL use)
00131 {
00132 mUseAckThrottling = use;
00133 }
00134
00135 void LLXferManager::setAckThrottleBPS(const F32 bps)
00136 {
00137
00138
00139
00140
00141
00142 F32 min_bps = (1000.f * 8.f* mMaxIncomingXfers) / LL_PACKET_TIMEOUT;
00143
00144
00145 F32 actual_rate = llmax(min_bps*1.1f, bps);
00146 LL_DEBUGS("AppInit") << "LLXferManager ack throttle min rate: " << min_bps << LL_ENDL;
00147 LL_DEBUGS("AppInit") << "LLXferManager ack throttle actual rate: " << actual_rate << LL_ENDL;
00148 mAckThrottle.setRate(actual_rate);
00149 }
00150
00151
00153
00154 void LLXferManager::updateHostStatus()
00155 {
00156 LLXfer *xferp;
00157 LLHostStatus *host_statusp = NULL;
00158
00159 for_each(mOutgoingHosts.begin(), mOutgoingHosts.end(), DeletePointer());
00160 mOutgoingHosts.clear();
00161
00162 for (xferp = mSendList; xferp; xferp = xferp->mNext)
00163 {
00164 for (status_list_t::iterator iter = mOutgoingHosts.begin();
00165 iter != mOutgoingHosts.end(); ++iter)
00166 {
00167 host_statusp = *iter;
00168 if (host_statusp->mHost == xferp->mRemoteHost)
00169 {
00170 break;
00171 }
00172 }
00173 if (!host_statusp)
00174 {
00175 host_statusp = new LLHostStatus();
00176 if (host_statusp)
00177 {
00178 host_statusp->mHost = xferp->mRemoteHost;
00179 mOutgoingHosts.push_front(host_statusp);
00180 }
00181 }
00182 if (host_statusp)
00183 {
00184 if (xferp->mStatus == e_LL_XFER_PENDING)
00185 {
00186 host_statusp->mNumPending++;
00187 }
00188 else if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00189 {
00190 host_statusp->mNumActive++;
00191 }
00192 }
00193
00194 }
00195 }
00196
00198
00199 void LLXferManager::printHostStatus()
00200 {
00201 LLHostStatus *host_statusp = NULL;
00202 if (!mOutgoingHosts.empty())
00203 {
00204 llinfos << "Outgoing Xfers:" << llendl;
00205
00206 for (status_list_t::iterator iter = mOutgoingHosts.begin();
00207 iter != mOutgoingHosts.end(); ++iter)
00208 {
00209 host_statusp = *iter;
00210 llinfos << " " << host_statusp->mHost << " active: " << host_statusp->mNumActive << " pending: " << host_statusp->mNumPending << llendl;
00211 }
00212 }
00213 }
00214
00216
00217 LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head)
00218 {
00219 LLXfer *xferp;
00220 for (xferp = list_head; xferp; xferp = xferp->mNext)
00221 {
00222 if (xferp->mID == id)
00223 {
00224 return(xferp);
00225 }
00226 }
00227 return(NULL);
00228 }
00229
00230
00232
00233 void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head)
00234 {
00235
00236
00237 if (delp)
00238 {
00239 if (*list_head == delp)
00240 {
00241 *list_head = delp->mNext;
00242 delete (delp);
00243 }
00244 else
00245 {
00246 LLXfer *xferp = *list_head;
00247 while (xferp->mNext)
00248 {
00249 if (xferp->mNext == delp)
00250 {
00251 xferp->mNext = delp->mNext;
00252 delete (delp);
00253 break;
00254 }
00255 xferp = xferp->mNext;
00256 }
00257 }
00258 }
00259 }
00260
00262
00263 U32 LLXferManager::numActiveListEntries(LLXfer *list_head)
00264 {
00265 U32 num_entries = 0;
00266
00267 while (list_head)
00268 {
00269 if ((list_head->mStatus == e_LL_XFER_IN_PROGRESS))
00270 {
00271 num_entries++;
00272 }
00273 list_head = list_head->mNext;
00274 }
00275 return(num_entries);
00276 }
00277
00279
00280 S32 LLXferManager::numPendingXfers(const LLHost &host)
00281 {
00282 LLHostStatus *host_statusp = NULL;
00283
00284 for (status_list_t::iterator iter = mOutgoingHosts.begin();
00285 iter != mOutgoingHosts.end(); ++iter)
00286 {
00287 host_statusp = *iter;
00288 if (host_statusp->mHost == host)
00289 {
00290 return (host_statusp->mNumPending);
00291 }
00292 }
00293 return 0;
00294 }
00295
00297
00298 S32 LLXferManager::numActiveXfers(const LLHost &host)
00299 {
00300 LLHostStatus *host_statusp = NULL;
00301
00302 for (status_list_t::iterator iter = mOutgoingHosts.begin();
00303 iter != mOutgoingHosts.end(); ++iter)
00304 {
00305 host_statusp = *iter;
00306 if (host_statusp->mHost == host)
00307 {
00308 return (host_statusp->mNumActive);
00309 }
00310 }
00311 return 0;
00312 }
00313
00315
00316 void LLXferManager::changeNumActiveXfers(const LLHost &host, S32 delta)
00317 {
00318 LLHostStatus *host_statusp = NULL;
00319
00320 for (status_list_t::iterator iter = mOutgoingHosts.begin();
00321 iter != mOutgoingHosts.end(); ++iter)
00322 {
00323 host_statusp = *iter;
00324 if (host_statusp->mHost == host)
00325 {
00326 host_statusp->mNumActive += delta;
00327 }
00328 }
00329 }
00330
00332
00333 void LLXferManager::registerCallbacks(LLMessageSystem *msgsystem)
00334 {
00335 msgsystem->setHandlerFuncFast(_PREHASH_ConfirmXferPacket, process_confirm_packet, NULL);
00336 msgsystem->setHandlerFuncFast(_PREHASH_RequestXfer, process_request_xfer, NULL);
00337 msgsystem->setHandlerFuncFast(_PREHASH_SendXferPacket, continue_file_receive, NULL);
00338 msgsystem->setHandlerFuncFast(_PREHASH_AbortXfer, process_abort_xfer, NULL);
00339 }
00340
00342
00343 U64 LLXferManager::getNextID ()
00344 {
00345 LLUUID a_guid;
00346
00347 a_guid.generate();
00348
00349
00350 return(*((U64*)(a_guid.mData)));
00351 }
00352
00354
00355 S32 LLXferManager::encodePacketNum(S32 packet_num, BOOL is_EOF)
00356 {
00357 if (is_EOF)
00358 {
00359 packet_num |= 0x80000000;
00360 }
00361 return packet_num;
00362 }
00363
00365
00366 S32 LLXferManager::decodePacketNum(S32 packet_num)
00367 {
00368 return(packet_num & 0x0FFFFFFF);
00369 }
00370
00372
00373 BOOL LLXferManager::isLastPacket(S32 packet_num)
00374 {
00375 return(packet_num & 0x80000000);
00376 }
00377
00379
00380 U64 LLXferManager::registerXfer(const void *datap, const S32 length)
00381 {
00382 LLXfer *xferp;
00383 U64 xfer_id = getNextID();
00384
00385 xferp = (LLXfer *) new LLXfer_Mem();
00386 if (xferp)
00387 {
00388 xferp->mNext = mSendList;
00389 mSendList = xferp;
00390
00391 xfer_id = ((LLXfer_Mem *)xferp)->registerXfer(xfer_id, datap,length);
00392
00393 if (!xfer_id)
00394 {
00395 removeXfer(xferp,&mSendList);
00396 }
00397 }
00398 else
00399 {
00400 llerrs << "Xfer allocation error" << llendl;
00401 xfer_id = 0;
00402 }
00403
00404 return(xfer_id);
00405 }
00406
00408
00409 void LLXferManager::requestFile(const char* local_filename,
00410 const char* remote_filename,
00411 ELLPath remote_path,
00412 const LLHost& remote_host,
00413 BOOL delete_remote_on_completion,
00414 void (*callback)(void**,S32,LLExtStat),
00415 void** user_data,
00416 BOOL is_priority,
00417 BOOL use_big_packets)
00418 {
00419 LLXfer *xferp;
00420
00421 for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
00422 {
00423 if (xferp->getXferTypeTag() == LLXfer::XFER_FILE
00424 && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename))
00425 && (((LLXfer_File*)xferp)->matchesRemoteFilename(remote_filename, remote_path))
00426 && (remote_host == xferp->mRemoteHost)
00427 && (callback == xferp->mCallback)
00428 && (user_data == xferp->mCallbackDataHandle))
00429
00430 {
00431
00432 return;
00433 }
00434 }
00435
00436 S32 chunk_size = use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1;
00437 xferp = (LLXfer *) new LLXfer_File(chunk_size);
00438 if (xferp)
00439 {
00440 addToList(xferp, mReceiveList, is_priority);
00441
00442
00443
00444
00445
00446 if(delete_remote_on_completion &&
00447 (strstr(remote_filename,".tmp") == &remote_filename[strlen(remote_filename)-4]))
00448 {
00449 LLFile::remove(local_filename);
00450 }
00451 ((LLXfer_File *)xferp)->initializeRequest(
00452 getNextID(),
00453 local_filename,
00454 remote_filename,
00455 remote_path,
00456 remote_host,
00457 delete_remote_on_completion,
00458 callback,user_data);
00459 startPendingDownloads();
00460 }
00461 else
00462 {
00463 llerrs << "Xfer allocation error" << llendl;
00464 }
00465 }
00466
00467 void LLXferManager::requestFile(const char* remote_filename,
00468 ELLPath remote_path,
00469 const LLHost& remote_host,
00470 BOOL delete_remote_on_completion,
00471 void (*callback)(void*,S32,void**,S32,LLExtStat),
00472 void** user_data,
00473 BOOL is_priority)
00474 {
00475 LLXfer *xferp;
00476
00477 xferp = (LLXfer *) new LLXfer_Mem();
00478 if (xferp)
00479 {
00480 addToList(xferp, mReceiveList, is_priority);
00481 ((LLXfer_Mem *)xferp)->initializeRequest(getNextID(),
00482 remote_filename,
00483 remote_path,
00484 remote_host,
00485 delete_remote_on_completion,
00486 callback, user_data);
00487 startPendingDownloads();
00488 }
00489 else
00490 {
00491 llerrs << "Xfer allocation error" << llendl;
00492 }
00493 }
00494
00495 void LLXferManager::requestVFile(const LLUUID& local_id,
00496 const LLUUID& remote_id,
00497 LLAssetType::EType type, LLVFS* vfs,
00498 const LLHost& remote_host,
00499 void (*callback)(void**,S32,LLExtStat),
00500 void** user_data,
00501 BOOL is_priority)
00502 {
00503 LLXfer *xferp;
00504
00505 for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
00506 {
00507 if (xferp->getXferTypeTag() == LLXfer::XFER_VFILE
00508 && (((LLXfer_VFile*)xferp)->matchesLocalFile(local_id, type))
00509 && (((LLXfer_VFile*)xferp)->matchesRemoteFile(remote_id, type))
00510 && (remote_host == xferp->mRemoteHost)
00511 && (callback == xferp->mCallback)
00512 && (user_data == xferp->mCallbackDataHandle))
00513
00514 {
00515
00516 return;
00517 }
00518 }
00519
00520 xferp = (LLXfer *) new LLXfer_VFile();
00521 if (xferp)
00522 {
00523 addToList(xferp, mReceiveList, is_priority);
00524 ((LLXfer_VFile *)xferp)->initializeRequest(getNextID(),
00525 vfs,
00526 local_id,
00527 remote_id,
00528 type,
00529 remote_host,
00530 callback,
00531 user_data);
00532 startPendingDownloads();
00533 }
00534 else
00535 {
00536 llerrs << "Xfer allocation error" << llendl;
00537 }
00538
00539 }
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00602
00603 void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** )
00604 {
00605
00606 const S32 BUF_SIZE = LL_XFER_LARGE_PAYLOAD + 4;
00607 char fdata_buf[LL_XFER_LARGE_PAYLOAD + 4];
00608 S32 fdata_size;
00609 U64 id;
00610 S32 packetnum;
00611 LLXfer * xferp;
00612
00613 mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00614 mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetnum);
00615
00616 fdata_size = mesgsys->getSizeFast(_PREHASH_DataPacket,_PREHASH_Data);
00617 mesgsys->getBinaryDataFast(_PREHASH_DataPacket, _PREHASH_Data, fdata_buf, 0, 0, BUF_SIZE);
00618
00619 xferp = findXfer(id, mReceiveList);
00620
00621 if (!xferp)
00622 {
00623 char U64_BUF[MAX_STRING];
00624 llwarns << "received xfer data from " << mesgsys->getSender()
00625 << " for non-existent xfer id: "
00626 << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << llendl;
00627 return;
00628 }
00629
00630 S32 xfer_size;
00631
00632 if (decodePacketNum(packetnum) != xferp->mPacketNum)
00633 {
00634
00635 if (decodePacketNum(packetnum) == (xferp->mPacketNum - 1))
00636 {
00637 llinfos << "Reconfirming xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet " << packetnum << llendl; sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
00638 }
00639 else
00640 {
00641 llinfos << "Ignoring xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " recv'd packet " << packetnum << "; expecting " << xferp->mPacketNum << llendl;
00642 }
00643 return;
00644 }
00645
00646 S32 result = 0;
00647
00648 if (xferp->mPacketNum == 0)
00649 {
00650 ntohmemcpy(&xfer_size,fdata_buf,MVT_S32,sizeof(S32));
00651
00652
00653 xferp->setXferSize(xfer_size);
00654
00655
00656 result = xferp->receiveData(&(fdata_buf[sizeof(S32)]),fdata_size-(sizeof(S32)));
00657 }
00658 else
00659 {
00660 result = xferp->receiveData(fdata_buf,fdata_size);
00661 }
00662
00663 if (result == LL_ERR_CANNOT_OPEN_FILE)
00664 {
00665 xferp->abort(LL_ERR_CANNOT_OPEN_FILE);
00666 removeXfer(xferp,&mReceiveList);
00667 startPendingDownloads();
00668 return;
00669 }
00670
00671 xferp->mPacketNum++;
00672
00673 if (!mUseAckThrottling)
00674 {
00675
00676 sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
00677 }
00678 else
00679 {
00680
00681 LLXferAckInfo ack_info;
00682 ack_info.mID = id;
00683 ack_info.mPacketNum = decodePacketNum(packetnum);
00684 ack_info.mRemoteHost = mesgsys->getSender();
00685 mXferAckQueue.push(ack_info);
00686 }
00687
00688 if (isLastPacket(packetnum))
00689 {
00690 xferp->processEOF();
00691 removeXfer(xferp,&mReceiveList);
00692 startPendingDownloads();
00693 }
00694 }
00695
00697
00698 void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 packetnum, const LLHost &remote_host)
00699 {
00700 #if LL_XFER_PROGRESS_MESSAGES
00701 if (!(packetnum % 50))
00702 {
00703 cout << "confirming xfer packet #" << packetnum << endl;
00704 }
00705 #endif
00706 mesgsys->newMessageFast(_PREHASH_ConfirmXferPacket);
00707 mesgsys->nextBlockFast(_PREHASH_XferID);
00708 mesgsys->addU64Fast(_PREHASH_ID, id);
00709 mesgsys->addU32Fast(_PREHASH_Packet, packetnum);
00710
00711 mesgsys->sendMessage(remote_host);
00712 }
00713
00715
00716 void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** )
00717 {
00718
00719 U64 id;
00720 char local_filename[MAX_STRING];
00721 ELLPath local_path = LL_PATH_NONE;
00722 S32 result = LL_ERR_NOERR;
00723 LLUUID uuid;
00724 LLAssetType::EType type;
00725 S16 type_s16;
00726 BOOL b_use_big_packets;
00727
00728 mesgsys->getBOOL("XferID", "UseBigPackets", b_use_big_packets);
00729
00730 mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00731 char U64_BUF[MAX_STRING];
00732 llinfos << "xfer request id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF))
00733 << " to " << mesgsys->getSender() << llendl;
00734
00735 mesgsys->getStringFast(_PREHASH_XferID, _PREHASH_Filename, MAX_STRING, local_filename);
00736
00737 U8 local_path_u8;
00738 mesgsys->getU8("XferID", "FilePath", local_path_u8);
00739 if( local_path_u8 < (U8)LL_PATH_COUNT )
00740 {
00741 local_path = (ELLPath)local_path_u8;
00742 }
00743 else
00744 {
00745 llwarns << "Invalid file path in LLXferManager::processFileRequest() " << (U32)local_path_u8 << llendl;
00746 }
00747
00748 mesgsys->getUUIDFast(_PREHASH_XferID, _PREHASH_VFileID, uuid);
00749 mesgsys->getS16Fast(_PREHASH_XferID, _PREHASH_VFileType, type_s16);
00750 type = (LLAssetType::EType)type_s16;
00751
00752 LLXfer *xferp;
00753
00754 if (uuid != LLUUID::null)
00755 {
00756 if(NULL == LLAssetType::lookup(type))
00757 {
00758 llwarns << "Invalid type for xfer request: " << uuid << ":"
00759 << type_s16 << " to " << mesgsys->getSender() << llendl;
00760 return;
00761 }
00762
00763 llinfos << "starting vfile transfer: " << uuid << "," << LLAssetType::lookup(type) << " to " << mesgsys->getSender() << llendl;
00764
00765 if (! mVFS)
00766 {
00767 llwarns << "Attempt to send VFile w/o available VFS" << llendl;
00768 return;
00769 }
00770
00771 xferp = (LLXfer *)new LLXfer_VFile(mVFS, uuid, type);
00772 if (xferp)
00773 {
00774 xferp->mNext = mSendList;
00775 mSendList = xferp;
00776 result = xferp->startSend(id,mesgsys->getSender());
00777 }
00778 else
00779 {
00780 llerrs << "Xfer allcoation error" << llendl;
00781 }
00782 }
00783 else if (strlen(local_filename))
00784 {
00785 std::string expanded_filename = gDirUtilp->getExpandedFilename( local_path, local_filename );
00786 llinfos << "starting file transfer: " << expanded_filename << " to " << mesgsys->getSender() << llendl;
00787
00788 BOOL delete_local_on_completion = FALSE;
00789 mesgsys->getBOOL("XferID", "DeleteOnCompletion", delete_local_on_completion);
00790
00791
00792 xferp = (LLXfer *)new LLXfer_File(expanded_filename, delete_local_on_completion, b_use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1);
00793
00794 if (xferp)
00795 {
00796 xferp->mNext = mSendList;
00797 mSendList = xferp;
00798 result = xferp->startSend(id,mesgsys->getSender());
00799 }
00800 else
00801 {
00802 llerrs << "Xfer allcoation error" << llendl;
00803 }
00804 }
00805 else
00806 {
00807 char U64_BUF[MAX_STRING];
00808 llinfos << "starting memory transfer: "
00809 << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to "
00810 << mesgsys->getSender() << llendl;
00811
00812 xferp = findXfer(id, mSendList);
00813
00814 if (xferp)
00815 {
00816 result = xferp->startSend(id,mesgsys->getSender());
00817 }
00818 else
00819 {
00820 llinfos << "Warning: " << U64_BUF << " not found." << llendl;
00821 result = LL_ERR_FILE_NOT_FOUND;
00822 }
00823 }
00824
00825 if (result)
00826 {
00827 if (xferp)
00828 {
00829 xferp->abort(result);
00830 removeXfer(xferp,&mSendList);
00831 }
00832 else
00833 {
00834 llinfos << "Aborting xfer to " << mesgsys->getSender() << " with error: " << result << llendl;
00835
00836 mesgsys->newMessageFast(_PREHASH_AbortXfer);
00837 mesgsys->nextBlockFast(_PREHASH_XferID);
00838 mesgsys->addU64Fast(_PREHASH_ID, id);
00839 mesgsys->addS32Fast(_PREHASH_Result, result);
00840
00841 mesgsys->sendMessage(mesgsys->getSender());
00842 }
00843 }
00844 else if(xferp && (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit))
00845 {
00846 xferp->sendNextPacket();
00847 changeNumActiveXfers(xferp->mRemoteHost,1);
00848
00849 }
00850 else
00851 {
00852 if(xferp)
00853 {
00854 llinfos << " queueing xfer request, " << numPendingXfers(xferp->mRemoteHost) << " ahead of this one" << llendl;
00855 }
00856 else
00857 {
00858 llwarns << "LLXferManager::processFileRequest() - no xfer found!"
00859 << llendl;
00860 }
00861 }
00862 }
00863
00865
00866 void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** )
00867 {
00868 U64 id = 0;
00869 S32 packetNum = 0;
00870
00871 mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
00872 mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetNum);
00873
00874 LLXfer* xferp = findXfer(id, mSendList);
00875 if (xferp)
00876 {
00877
00878 xferp->mWaitingForACK = FALSE;
00879 if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00880 {
00881 xferp->sendNextPacket();
00882 }
00883 else
00884 {
00885 removeXfer(xferp, &mSendList);
00886 }
00887 }
00888 }
00889
00891
00892 void LLXferManager::retransmitUnackedPackets ()
00893 {
00894 LLXfer *xferp;
00895 LLXfer *delp;
00896 xferp = mReceiveList;
00897 while(xferp)
00898 {
00899 if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
00900 {
00901
00902 if (! gMessageSystem->mCircuitInfo.isCircuitAlive( xferp->mRemoteHost ))
00903 {
00904 llinfos << "Xfer found in progress on dead circuit, aborting" << llendl;
00905 xferp->mCallbackResult = LL_ERR_CIRCUIT_GONE;
00906 xferp->processEOF();
00907 delp = xferp;
00908 xferp = xferp->mNext;
00909 removeXfer(delp,&mReceiveList);
00910 continue;
00911 }
00912
00913 }
00914 xferp = xferp->mNext;
00915 }
00916
00917 xferp = mSendList;
00918 updateHostStatus();
00919 F32 et;
00920 while (xferp)
00921 {
00922 if (xferp->mWaitingForACK && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_PACKET_TIMEOUT))
00923 {
00924 if (xferp->mRetries > LL_PACKET_RETRY_LIMIT)
00925 {
00926 llinfos << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet retransmit limit exceeded, xfer dropped" << llendl;
00927 xferp->abort(LL_ERR_TCP_TIMEOUT);
00928 delp = xferp;
00929 xferp = xferp->mNext;
00930 removeXfer(delp,&mSendList);
00931 }
00932 else
00933 {
00934 llinfos << "resending xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet unconfirmed after: "<< et << " sec, packet " << xferp->mPacketNum << llendl;
00935 xferp->resendLastPacket();
00936 xferp = xferp->mNext;
00937 }
00938 }
00939 else if ((xferp->mStatus == e_LL_XFER_REGISTERED) && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_XFER_REGISTRATION_TIMEOUT))
00940 {
00941 llinfos << "registered xfer never requested, xfer dropped" << llendl;
00942 xferp->abort(LL_ERR_TCP_TIMEOUT);
00943 delp = xferp;
00944 xferp = xferp->mNext;
00945 removeXfer(delp,&mSendList);
00946 }
00947 else if (xferp->mStatus == e_LL_XFER_ABORTED)
00948 {
00949 llwarns << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getName() << llendl;
00950 delp = xferp;
00951 xferp = xferp->mNext;
00952 removeXfer(delp,&mSendList);
00953 }
00954 else if (xferp->mStatus == e_LL_XFER_PENDING)
00955 {
00956
00957 if (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit)
00958 {
00959
00960 xferp->sendNextPacket();
00961 changeNumActiveXfers(xferp->mRemoteHost,1);
00962 }
00963 xferp = xferp->mNext;
00964 }
00965 else
00966 {
00967 xferp = xferp->mNext;
00968 }
00969 }
00970
00971
00972
00973
00974
00975
00976 while (mXferAckQueue.getLength())
00977 {
00978 if (mAckThrottle.checkOverflow(1000.0f*8.0f))
00979 {
00980 break;
00981 }
00982
00983 LLXferAckInfo ack_info;
00984 mXferAckQueue.pop(ack_info);
00985
00986 sendConfirmPacket(gMessageSystem, ack_info.mID, ack_info.mPacketNum, ack_info.mRemoteHost);
00987 mAckThrottle.throttleOverflow(1000.f*8.f);
00988 }
00989 }
00990
00991
00993
00994 void LLXferManager::processAbort (LLMessageSystem *mesgsys, void ** )
00995 {
00996 U64 id = 0;
00997 S32 result_code = 0;
00998 LLXfer * xferp;
00999
01000 mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
01001 mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Result, result_code);
01002
01003 xferp = findXfer(id, mReceiveList);
01004 if (xferp)
01005 {
01006 xferp->mCallbackResult = result_code;
01007 xferp->processEOF();
01008 removeXfer(xferp, &mReceiveList);
01009 startPendingDownloads();
01010 }
01011 }
01012
01014
01015 void LLXferManager::startPendingDownloads()
01016 {
01017
01018
01019
01020
01021
01022
01023
01024 LLXfer* xferp = mReceiveList;
01025 std::list<LLXfer*> pending_downloads;
01026 S32 download_count = 0;
01027 S32 pending_count = 0;
01028 while(xferp)
01029 {
01030 if(xferp->mStatus == e_LL_XFER_PENDING)
01031 {
01032 ++pending_count;
01033 pending_downloads.push_front(xferp);
01034 }
01035 else if(xferp->mStatus == e_LL_XFER_IN_PROGRESS)
01036 {
01037 ++download_count;
01038 }
01039 xferp = xferp->mNext;
01040 }
01041
01042 S32 start_count = mMaxIncomingXfers - download_count;
01043
01044 lldebugs << "LLXferManager::startPendingDownloads() - XFER_IN_PROGRESS: "
01045 << download_count << " XFER_PENDING: " << pending_count
01046 << " startring " << llmin(start_count, pending_count) << llendl;
01047
01048 if((start_count > 0) && (pending_count > 0))
01049 {
01050 S32 result;
01051 for (std::list<LLXfer*>::iterator iter = pending_downloads.begin();
01052 iter != pending_downloads.end(); ++iter)
01053 {
01054 xferp = *iter;
01055 if (start_count-- <= 0)
01056 break;
01057 result = xferp->startDownload();
01058 if(result)
01059 {
01060 xferp->abort(result);
01061 ++start_count;
01062 }
01063 }
01064 }
01065 }
01066
01068
01069 void LLXferManager::addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority)
01070 {
01071 if(is_priority)
01072 {
01073 xferp->mNext = NULL;
01074 LLXfer* next = head;
01075 if(next)
01076 {
01077 while(next->mNext)
01078 {
01079 next = next->mNext;
01080 }
01081 next->mNext = xferp;
01082 }
01083 else
01084 {
01085 head = xferp;
01086 }
01087 }
01088 else
01089 {
01090 xferp->mNext = head;
01091 head = xferp;
01092 }
01093 }
01094
01096
01098
01099 LLXferManager *gXferManager = NULL;
01100
01101
01102 void start_xfer_manager(LLVFS *vfs)
01103 {
01104 gXferManager = new LLXferManager(vfs);
01105 }
01106
01107 void cleanup_xfer_manager()
01108 {
01109 if (gXferManager)
01110 {
01111 delete(gXferManager);
01112 gXferManager = NULL;
01113 }
01114 }
01115
01116 void process_confirm_packet (LLMessageSystem *mesgsys, void **user_data)
01117 {
01118 gXferManager->processConfirmation(mesgsys,user_data);
01119 }
01120
01121 void process_request_xfer(LLMessageSystem *mesgsys, void **user_data)
01122 {
01123 gXferManager->processFileRequest(mesgsys,user_data);
01124 }
01125
01126 void continue_file_receive(LLMessageSystem *mesgsys, void **user_data)
01127 {
01128 #if LL_TEST_XFER_REXMIT
01129 if (ll_frand() > 0.05f)
01130 {
01131 #endif
01132 gXferManager->processReceiveData(mesgsys,user_data);
01133 #if LL_TEST_XFER_REXMIT
01134 }
01135 else
01136 {
01137 cout << "oops! dropped a xfer packet" << endl;
01138 }
01139 #endif
01140 }
01141
01142 void process_abort_xfer(LLMessageSystem *mesgsys, void **user_data)
01143 {
01144 gXferManager->processAbort(mesgsys,user_data);
01145 }
01146
01147
01148
01149
01150
01151
01152
01153
01154
01155
01156
01157
01158
01159
01160
01161
01162
01163
01164
01165
01166
01167
01168
01169
01170
01171